"""Grid that provides common functionality for different 2D grid types."""
from abc import ABC
import numpy as np
import xarray as xr
import pandas as pd
from scipy.interpolate import griddata
from numba import njit
from torx import Quantity
from torx.units_handling_m import check_units
from torx.vector import poloidal_vector
from torx.autodoc_decorators_m import autodoc_class
[docs]
@autodoc_class
class Grid2D(ABC):
"""2D regular grid on a poloidal cross section."""
[docs]
def __init__(self,
r_unstructured,
z_unstructured,
norm: Quantity=Quantity("1m"),
init_matrices=False):
"""
Create a grid with the unstructured (R,Z) coordinates provided.
Accepts optionally a filepath to read in GRILLIX trunk data.
Accepts optionally a length quantity to normalize the grid.
"""
self.r_u, self.r_s, r_spacing, r_size = \
self.basis_from_unstructured_array(r_unstructured)
self.z_u, self.z_s, z_spacing, z_size = \
self.basis_from_unstructured_array(z_unstructured)
self.r_u = xr.DataArray(self.r_u, dims="points")
self.r_s = xr.DataArray(self.r_s, dims="R")
self.z_u = xr.DataArray(self.z_u, dims="points")
self.z_s = xr.DataArray(self.z_s, dims="Z")
assert(r_size == z_size), \
"Expected equal sized coordinate arrays but found " \
f"{r_size} != {z_size}"
self.size = r_size
self.shape = (self.r_s.size, self.z_s.size)
self.r_spacing, self.z_spacing = r_spacing, z_spacing
if np.isclose(r_spacing, z_spacing):
self.spacing = r_spacing
self._norm = norm
self._parallel_length_scale = None
self._perpendicular_length_scale = None
self._flipped_z = False
if init_matrices:
self.setup_vector_to_matrix()
self.setup_filled_vector_to_matrix()
self.setup_perpendicular_gradient()
self._vector_to_matrix_initialized = True
self._perpendicular_gradient_initialized = True
else:
self._vector_to_matrix_initialized = False
self._perpendicular_gradient_initialized = False
@property
def R0(self):
"""Normalized radius."""
return self._norm
@R0.setter
def R0(self, value: Quantity):
"""Set the normalized radius."""
check_units(value, {"[length]":1}, "R0")
self._norm = value
[docs]
@staticmethod
def basis_from_unstructured_array(x_unstructured):
"""Find the spacing and unique values of unstructured coordinates."""
x_unstructured = np.array(x_unstructured)
assert x_unstructured.ndim == 1
x_structured = np.unique(x_unstructured)
x_structured = x_structured[~np.isnan(x_structured)]
spacing = np.mean(np.diff(x_structured))
assert np.allclose(
np.diff(x_structured), spacing
), "Error: basis vector is not equally spaced."
return x_unstructured, x_structured, spacing, x_unstructured.size
[docs]
def setup_vector_to_matrix(self):
"""Calculate arrays to improve the performance of vector to matrix."""
tricolumn_data = np.column_stack(
(self.r_u, self.z_u, np.arange(start=0, stop=self.size, dtype=int))
)
pd_dataframe = pd.DataFrame(tricolumn_data, columns=["x", "y", "z"])
# Makes a 2D array of indices
shaped_data = pd_dataframe.pivot_table(
values="z", index="y", columns="x", dropna=False
)
# Flatten array to give a vector
flattened_data = np.array(shaped_data).flatten()
# Replaces NaN with zero (will return the z[0] element in this place)
self.sort_indices = np.nan_to_num(flattened_data).astype("int")
# Add NaN vector to z[self.sort_indices] to replace z[0] with NaN
self.nan_vector = np.zeros_like(flattened_data)
self.nan_vector[np.isnan(flattened_data)] = np.nan
forward_sort = flattened_data[
np.logical_not(np.isnan(self.nan_vector))
].astype(int)
assert forward_sort.size == self.size
self.reverse_sort_indices = np.zeros_like(forward_sort)
self.reverse_sort_indices[forward_sort] = np.arange(forward_sort.size)
[docs]
def setup_filled_vector_to_matrix(self):
"""
Calculate the array to perform a filled vector to matrix.
Calculates the 'filled point index' array which can be used to
index an unstructured array and return a filled 2D array, using
nearest neighbor interpolation for off-grid points.
"""
point_index = xr.DataArray(np.arange(self.size), dims="points")
r_mesh, z_mesh = np.meshgrid(self.r_s, self.z_s)
self.filled_point_index = griddata(
((self.r_u, self.z_u)),
point_index,
(r_mesh, z_mesh),
method="nearest"
).astype(int)
[docs]
def setup_perpendicular_gradient(self):
"""
Calculate array to create a perpendicular gradient operator on the grid.
Create neighbor index arrays for left, right, up and down neighbors
that can be used to evaluate the perpendicular gradient
"""
point_index = xr.DataArray(np.arange(self.size), dims="points")
# The idea is to simply store the left, right, up, down neighbors for
# the structured (R, Z) data and then convert it back to the
# unstructured form.
point_matrix = self.vector_to_matrix(point_index)
right_matrix = point_matrix.copy()
left_matrix = point_matrix.copy()
up_matrix = point_matrix.copy()
down_matrix = point_matrix.copy()
# We need to take care of the boundaries which are given by NaN
# neighbors. Here we set the index to the point itself which can be
# used straightforwardly in a forward stencil
LL = ~np.isnan(point_matrix[:, 1])
right_matrix[LL, 0] = point_matrix[LL, 1]
LL = ~np.isnan(point_matrix[:, -2])
left_matrix[LL, -1] = point_matrix[LL, -2]
for i in range(1, len(point_matrix.R) - 1):
LL = ~np.isnan(point_matrix[:, i + 1])
right_matrix[LL, i] = point_matrix[LL, i + 1]
LL = ~np.isnan(point_matrix[:, i - 1])
left_matrix[LL, i] = point_matrix[LL, i - 1]
LL = ~np.isnan(point_matrix[1, :])
up_matrix[0, LL] = point_matrix[1, LL]
LL = ~np.isnan(point_matrix[-2, :])
down_matrix[-1, LL] = point_matrix[-2, LL]
for i in range(1, len(point_matrix.Z) - 1):
LL = ~np.isnan(point_matrix[i + 1, :])
up_matrix[i, LL] = point_matrix[i + 1, LL]
LL = ~np.isnan(point_matrix[i - 1, :])
down_matrix[i, LL] = point_matrix[i - 1, LL]
self.right_index = self.matrix_to_vector(right_matrix).astype(int)
self.left_index = self.matrix_to_vector(left_matrix).astype(int)
self.up_index = self.matrix_to_vector(up_matrix).astype(int)
self.down_index = self.matrix_to_vector(down_matrix).astype(int)
[docs]
def vector_to_matrix(
self, unstructured_data, filled: bool=False, allow_rechunk: bool=True
):
"""
Convert an array from the unstructured to structured representation.
Converts tricolumn data 'z(..., points, ....)'
to matrix data 'z(..., R, Z, ...)'
If unstructured_data is supplied as an xarray, the 'points' dimension
is shaped. If unstructured_data is supplied as a numpy array, it can
only be 1D
"""
if not isinstance(unstructured_data, xr.DataArray):
assert unstructured_data.ndim == 1
unstructured_data = xr.DataArray(unstructured_data, dims="points")
assert unstructured_data.sizes["points"] == self.size, \
("Unstructured data not the same size as input data. "
f"Unstructured data size: {unstructured_data.sizes['points']}, "
f"Input data size: {self.size}.")
if not self._vector_to_matrix_initialized:
self.setup_vector_to_matrix()
self.setup_filled_vector_to_matrix()
self._vector_to_matrix_initialized = True
structured_data = xr.apply_ufunc(
_stack,
unstructured_data,
kwargs=dict(
filled=filled,
sort_indices=self.sort_indices,
nan_vector=self.nan_vector,
filled_point_index=self.filled_point_index,
),
input_core_dims=[
["points"],
],
output_core_dims=[["Z", "R"]],
keep_attrs=True,
vectorize=True,
dask="parallelized",
dask_gufunc_kwargs=dict(
output_sizes={"Z": self.z_s.size, "R": self.r_s.size},
allow_rechunk=allow_rechunk,
),
output_dtypes=[np.float64],
).assign_coords(R=self.r_s, Z=self.z_s)
return structured_data
[docs]
def matrix_to_vector(self, structured_data, allow_rechunk: bool = True):
"""
Convert an array from the structured to unstructured representation.
Converts matrix data 'z(..., R, Z, ...)'
to tricolumn data 'z(..., points, ....)'
"""
if not isinstance(structured_data, xr.DataArray):
assert structured_data.ndim == 2
structured_data = xr.DataArray(structured_data, dims=("Z", "R"))
assert structured_data.sizes["R"] == self.r_s.size
assert structured_data.sizes["Z"] == self.z_s.size
if not self._vector_to_matrix_initialized:
self.setup_vector_to_matrix()
self.setup_filled_vector_to_matrix()
self._vector_to_matrix_initialized = True
unstructured_data = xr.apply_ufunc(
_flatten,
structured_data,
kwargs=dict(
nan_vector=self.nan_vector,
reverse_sort_indices=self.reverse_sort_indices,
),
input_core_dims=[
["Z", "R"],
],
output_core_dims=[["points"]],
keep_attrs=True,
vectorize=True,
dask="parallelized",
dask_gufunc_kwargs=dict(
output_sizes={"points": self.size}, allow_rechunk=allow_rechunk
),
output_dtypes=[np.float64],
)
return unstructured_data
[docs]
def perpendicular_gradient(self, array: xr.DataArray):
"""Return the perpendicular gradient of the given array as a vector."""
grad_r = self._gradient_R(array)
grad_z = self._gradient_Z(array)
return poloidal_vector(grad_r, grad_z,
dims=array.dims,
attrs=grad_r.attrs)
[docs]
def perpendicular_gradient_component(self, array: xr.DataArray, \
component="R"):
"""
Return a specific component of the perpendicular gradient.
Component can either be a string specifying "R" or "Z", or
alternatively an xarray DataArray representing a vector where to
project the gradient to.
"""
if isinstance(component, str):
if component == "R":
return self._gradient_R(array)
elif component == "Z":
return self._gradient_Z(array)
else:
raise RuntimeError(f"Component {component} is not available " \
"for perpendicular gradient.")
elif isinstance(component, xr.DataArray):
assert "vector" in component.dims, \
"Component should be a vector with dimension vector " \
"in the DataArray."
grad_r = self._gradient_R(array)
grad_z = self._gradient_Z(array)
grad_proj = component.sel(vector="eR") * grad_r \
+ component.sel(vector="eZ") * grad_z
if ("norm" in array.attrs and "norm" in component.attrs):
grad_proj.attrs["norm"] = component.norm * grad_r.norm
return grad_proj
[docs]
def radial_perpendicular_gradient(self, array: xr.DataArray, equi):
"""Return the radial component of the perpendicular gradient."""
e_rho = equi.magfield_vector_radial(self.r_u, self.z_u, normalize=True)
return self.perpendicular_gradient_component(array, e_rho)
def _gradient_R(self, array: xr.DataArray):
"""
Return the R component of the perpendicular gradient.
Base method to calculate the R component of the perpendicular gradient,
utilizing the neighbor index arrays created in the setup routine. Uses
second order central differences in the interior and first order
forward/backward differences at the boundaries. Works on structured or
unstructured data. The latter is preferred for speed.
"""
if not self._vector_to_matrix_initialized:
self.setup_vector_to_matrix()
self.setup_filled_vector_to_matrix()
self._vector_to_matrix_initialized = True
if not self._perpendicular_gradient_initialized:
self.setup_perpendicular_gradient()
self._perpendicular_gradient_initialized = True
if not "points" in array.dims:
assert ("R" in array.dims and "Z" in array.dims), \
"Neither dimension points, R or Z were found in the array."
# If array given in structured R, Z, we convert to unstructured
# points, perform the calculation and convert back in the end
array = self.matrix_to_vector(array)
convert_to_RZ = True
else:
convert_to_RZ = False
grad_r = (array.isel({"points":self.right_index}) \
- array.isel({"points":self.left_index})) \
* 0.5 / self.r_spacing
# Apply correction at edge point to achieve first order forward stencil
edge = np.logical_or(self.right_index==self.right_index.points, \
self.left_index==self.left_index.points)
grad_r.loc[{"points":edge}] = grad_r.isel({"points":edge}) * 2.0
# If no norm attribute given, assume normalized units
if "norm" in array.attrs:
grad_r.attrs["norm"] = array.norm / self.R0
if convert_to_RZ:
return self.vector_to_matrix(grad_r)
else:
return grad_r
def _gradient_Z(self, array: xr.DataArray):
"""
Return the Z component of the perpendicular gradient.
Base method to calculate the Z component of the perpendicular gradient,
utilizing the neighbor index arrays created in the setup routine. Uses
second order central differences in the interior and first order
forward/backward differences at the boundaries. Works on structured or
unstructured data. The latter is preferred for speed.
"""
if not self._vector_to_matrix_initialized:
self.setup_vector_to_matrix()
self.setup_filled_vector_to_matrix()
self._vector_to_matrix_initialized = True
if not self._perpendicular_gradient_initialized:
self.setup_perpendicular_gradient()
self._perpendicular_gradient_initialized = True
if not "points" in array.dims:
assert ("R" in array.dims and "Z" in array.dims), \
"Neither dimension points, R or Z were found in the array."
array = self.matrix_to_vector(array)
convert_to_RZ = True
else:
convert_to_RZ = False
grad_z = (array.isel({"points":self.up_index}) \
- array.isel({"points":self.down_index})) \
* 0.5 / self.z_spacing
edge = np.logical_or(self.up_index==self.up_index.points, \
self.down_index==self.down_index.points)
grad_z.loc[{"points":edge}] = grad_z.isel({"points":edge}) * 2.0
# If no norm attribute given, assume normalized units
if "norm" in array.attrs:
grad_z.attrs["norm"] = array.norm / self.R0
if convert_to_RZ:
return self.vector_to_matrix(grad_z)
else:
return grad_z
[docs]
def get_unstructured_index(self, rval, zval):
"""Return index of r_u and z_u that correspond to rval and zval."""
ui = 0
smallest = 1e10
for i in range(len(self.r_u)):
dist = np.sqrt((self.r_u[i] - rval)**2 \
+ (self.z_u[i] - zval)**2)
if dist < smallest:
ui = i
smallest = dist
return ui
[docs]
def coords_like(self, array: xr.DataArray):
"""Return (R,Z) coordinates that fit the dims of the given array."""
assert(isinstance(array, xr.DataArray)), \
"Array must be of type xarray DataArray."
if("R" in array.dims and "Z" in array.dims):
r_norm = self.r_s
z_norm = self.z_s
elif("points" in array.dims):
r_norm = self.r_u
z_norm = self.z_u
else:
r_norm = None
z_norm = None
return r_norm, z_norm
@njit
def _stack(unstructured_data, filled, sort_indices, nan_vector,
filled_point_index):
"""Return the inner shaping loop for vector_to_matrix."""
result = np.zeros(filled_point_index.shape)
if not filled:
result = (unstructured_data[sort_indices] + nan_vector).reshape(
filled_point_index.shape
)
else:
for i in range(filled_point_index.shape[0]):
for j in range(filled_point_index.shape[1]):
result[i, j] = unstructured_data[filled_point_index[i, j]]
return result
@njit
def _flatten(input_array, nan_vector, reverse_sort_indices):
"""Return the inner flatten loop for matrix_to_vector."""
return input_array.flatten()[np.logical_not(np.isnan(nan_vector))][
reverse_sort_indices
]