Source code for torx.grid.grid_2d_m

"""Grid that provides common functionality for different 2D grid types."""
from abc import ABC
import numpy as np
import xarray as xr
import pandas as pd
from scipy.interpolate import griddata
from numba import njit

from torx import Quantity
from torx.units_handling_m import check_units
from torx.vector import poloidal_vector
from torx.autodoc_decorators_m import autodoc_class

[docs] @autodoc_class class Grid2D(ABC): """2D regular grid on a poloidal cross section."""
[docs] def __init__(self, r_unstructured, z_unstructured, norm: Quantity=Quantity("1m"), init_matrices=False): """ Create a grid with the unstructured (R,Z) coordinates provided. Accepts optionally a filepath to read in GRILLIX trunk data. Accepts optionally a length quantity to normalize the grid. """ self.r_u, self.r_s, r_spacing, r_size = \ self.basis_from_unstructured_array(r_unstructured) self.z_u, self.z_s, z_spacing, z_size = \ self.basis_from_unstructured_array(z_unstructured) self.r_u = xr.DataArray(self.r_u, dims="points") self.r_s = xr.DataArray(self.r_s, dims="R") self.z_u = xr.DataArray(self.z_u, dims="points") self.z_s = xr.DataArray(self.z_s, dims="Z") assert(r_size == z_size), \ "Expected equal sized coordinate arrays but found " \ f"{r_size} != {z_size}" self.size = r_size self.shape = (self.r_s.size, self.z_s.size) self.r_spacing, self.z_spacing = r_spacing, z_spacing if np.isclose(r_spacing, z_spacing): self.spacing = r_spacing self._norm = norm self._parallel_length_scale = None self._perpendicular_length_scale = None self._flipped_z = False if init_matrices: self.setup_vector_to_matrix() self.setup_filled_vector_to_matrix() self.setup_perpendicular_gradient() self._vector_to_matrix_initialized = True self._perpendicular_gradient_initialized = True else: self._vector_to_matrix_initialized = False self._perpendicular_gradient_initialized = False
@property def R0(self): """Normalized radius.""" return self._norm @R0.setter def R0(self, value: Quantity): """Set the normalized radius.""" check_units(value, {"[length]":1}, "R0") self._norm = value
[docs] @staticmethod def basis_from_unstructured_array(x_unstructured): """Find the spacing and unique values of unstructured coordinates.""" x_unstructured = np.array(x_unstructured) assert x_unstructured.ndim == 1 x_structured = np.unique(x_unstructured) x_structured = x_structured[~np.isnan(x_structured)] spacing = np.mean(np.diff(x_structured)) assert np.allclose( np.diff(x_structured), spacing ), "Error: basis vector is not equally spaced." return x_unstructured, x_structured, spacing, x_unstructured.size
[docs] def setup_vector_to_matrix(self): """Calculate arrays to improve the performance of vector to matrix.""" tricolumn_data = np.column_stack( (self.r_u, self.z_u, np.arange(start=0, stop=self.size, dtype=int)) ) pd_dataframe = pd.DataFrame(tricolumn_data, columns=["x", "y", "z"]) # Makes a 2D array of indices shaped_data = pd_dataframe.pivot_table( values="z", index="y", columns="x", dropna=False ) # Flatten array to give a vector flattened_data = np.array(shaped_data).flatten() # Replaces NaN with zero (will return the z[0] element in this place) self.sort_indices = np.nan_to_num(flattened_data).astype("int") # Add NaN vector to z[self.sort_indices] to replace z[0] with NaN self.nan_vector = np.zeros_like(flattened_data) self.nan_vector[np.isnan(flattened_data)] = np.nan forward_sort = flattened_data[ np.logical_not(np.isnan(self.nan_vector)) ].astype(int) assert forward_sort.size == self.size self.reverse_sort_indices = np.zeros_like(forward_sort) self.reverse_sort_indices[forward_sort] = np.arange(forward_sort.size)
[docs] def setup_filled_vector_to_matrix(self): """ Calculate the array to perform a filled vector to matrix. Calculates the 'filled point index' array which can be used to index an unstructured array and return a filled 2D array, using nearest neighbor interpolation for off-grid points. """ point_index = xr.DataArray(np.arange(self.size), dims="points") r_mesh, z_mesh = np.meshgrid(self.r_s, self.z_s) self.filled_point_index = griddata( ((self.r_u, self.z_u)), point_index, (r_mesh, z_mesh), method="nearest" ).astype(int)
[docs] def setup_perpendicular_gradient(self): """ Calculate array to create a perpendicular gradient operator on the grid. Create neighbor index arrays for left, right, up and down neighbors that can be used to evaluate the perpendicular gradient """ point_index = xr.DataArray(np.arange(self.size), dims="points") # The idea is to simply store the left, right, up, down neighbors for # the structured (R, Z) data and then convert it back to the # unstructured form. point_matrix = self.vector_to_matrix(point_index) right_matrix = point_matrix.copy() left_matrix = point_matrix.copy() up_matrix = point_matrix.copy() down_matrix = point_matrix.copy() # We need to take care of the boundaries which are given by NaN # neighbors. Here we set the index to the point itself which can be # used straightforwardly in a forward stencil LL = ~np.isnan(point_matrix[:, 1]) right_matrix[LL, 0] = point_matrix[LL, 1] LL = ~np.isnan(point_matrix[:, -2]) left_matrix[LL, -1] = point_matrix[LL, -2] for i in range(1, len(point_matrix.R) - 1): LL = ~np.isnan(point_matrix[:, i + 1]) right_matrix[LL, i] = point_matrix[LL, i + 1] LL = ~np.isnan(point_matrix[:, i - 1]) left_matrix[LL, i] = point_matrix[LL, i - 1] LL = ~np.isnan(point_matrix[1, :]) up_matrix[0, LL] = point_matrix[1, LL] LL = ~np.isnan(point_matrix[-2, :]) down_matrix[-1, LL] = point_matrix[-2, LL] for i in range(1, len(point_matrix.Z) - 1): LL = ~np.isnan(point_matrix[i + 1, :]) up_matrix[i, LL] = point_matrix[i + 1, LL] LL = ~np.isnan(point_matrix[i - 1, :]) down_matrix[i, LL] = point_matrix[i - 1, LL] self.right_index = self.matrix_to_vector(right_matrix).astype(int) self.left_index = self.matrix_to_vector(left_matrix).astype(int) self.up_index = self.matrix_to_vector(up_matrix).astype(int) self.down_index = self.matrix_to_vector(down_matrix).astype(int)
[docs] def vector_to_matrix( self, unstructured_data, filled: bool=False, allow_rechunk: bool=True ): """ Convert an array from the unstructured to structured representation. Converts tricolumn data 'z(..., points, ....)' to matrix data 'z(..., R, Z, ...)' If unstructured_data is supplied as an xarray, the 'points' dimension is shaped. If unstructured_data is supplied as a numpy array, it can only be 1D """ if not isinstance(unstructured_data, xr.DataArray): assert unstructured_data.ndim == 1 unstructured_data = xr.DataArray(unstructured_data, dims="points") assert unstructured_data.sizes["points"] == self.size, \ ("Unstructured data not the same size as input data. " f"Unstructured data size: {unstructured_data.sizes['points']}, " f"Input data size: {self.size}.") if not self._vector_to_matrix_initialized: self.setup_vector_to_matrix() self.setup_filled_vector_to_matrix() self._vector_to_matrix_initialized = True structured_data = xr.apply_ufunc( _stack, unstructured_data, kwargs=dict( filled=filled, sort_indices=self.sort_indices, nan_vector=self.nan_vector, filled_point_index=self.filled_point_index, ), input_core_dims=[ ["points"], ], output_core_dims=[["Z", "R"]], keep_attrs=True, vectorize=True, dask="parallelized", dask_gufunc_kwargs=dict( output_sizes={"Z": self.z_s.size, "R": self.r_s.size}, allow_rechunk=allow_rechunk, ), output_dtypes=[np.float64], ).assign_coords(R=self.r_s, Z=self.z_s) return structured_data
[docs] def matrix_to_vector(self, structured_data, allow_rechunk: bool = True): """ Convert an array from the structured to unstructured representation. Converts matrix data 'z(..., R, Z, ...)' to tricolumn data 'z(..., points, ....)' """ if not isinstance(structured_data, xr.DataArray): assert structured_data.ndim == 2 structured_data = xr.DataArray(structured_data, dims=("Z", "R")) assert structured_data.sizes["R"] == self.r_s.size assert structured_data.sizes["Z"] == self.z_s.size if not self._vector_to_matrix_initialized: self.setup_vector_to_matrix() self.setup_filled_vector_to_matrix() self._vector_to_matrix_initialized = True unstructured_data = xr.apply_ufunc( _flatten, structured_data, kwargs=dict( nan_vector=self.nan_vector, reverse_sort_indices=self.reverse_sort_indices, ), input_core_dims=[ ["Z", "R"], ], output_core_dims=[["points"]], keep_attrs=True, vectorize=True, dask="parallelized", dask_gufunc_kwargs=dict( output_sizes={"points": self.size}, allow_rechunk=allow_rechunk ), output_dtypes=[np.float64], ) return unstructured_data
[docs] def perpendicular_gradient(self, array: xr.DataArray): """Return the perpendicular gradient of the given array as a vector.""" grad_r = self._gradient_R(array) grad_z = self._gradient_Z(array) return poloidal_vector(grad_r, grad_z, dims=array.dims, attrs=grad_r.attrs)
[docs] def perpendicular_gradient_component(self, array: xr.DataArray, \ component="R"): """ Return a specific component of the perpendicular gradient. Component can either be a string specifying "R" or "Z", or alternatively an xarray DataArray representing a vector where to project the gradient to. """ if isinstance(component, str): if component == "R": return self._gradient_R(array) elif component == "Z": return self._gradient_Z(array) else: raise RuntimeError(f"Component {component} is not available " \ "for perpendicular gradient.") elif isinstance(component, xr.DataArray): assert "vector" in component.dims, \ "Component should be a vector with dimension vector " \ "in the DataArray." grad_r = self._gradient_R(array) grad_z = self._gradient_Z(array) grad_proj = component.sel(vector="eR") * grad_r \ + component.sel(vector="eZ") * grad_z if ("norm" in array.attrs and "norm" in component.attrs): grad_proj.attrs["norm"] = component.norm * grad_r.norm return grad_proj
[docs] def radial_perpendicular_gradient(self, array: xr.DataArray, equi): """Return the radial component of the perpendicular gradient.""" e_rho = equi.magfield_vector_radial(self.r_u, self.z_u, normalize=True) return self.perpendicular_gradient_component(array, e_rho)
def _gradient_R(self, array: xr.DataArray): """ Return the R component of the perpendicular gradient. Base method to calculate the R component of the perpendicular gradient, utilizing the neighbor index arrays created in the setup routine. Uses second order central differences in the interior and first order forward/backward differences at the boundaries. Works on structured or unstructured data. The latter is preferred for speed. """ if not self._vector_to_matrix_initialized: self.setup_vector_to_matrix() self.setup_filled_vector_to_matrix() self._vector_to_matrix_initialized = True if not self._perpendicular_gradient_initialized: self.setup_perpendicular_gradient() self._perpendicular_gradient_initialized = True if not "points" in array.dims: assert ("R" in array.dims and "Z" in array.dims), \ "Neither dimension points, R or Z were found in the array." # If array given in structured R, Z, we convert to unstructured # points, perform the calculation and convert back in the end array = self.matrix_to_vector(array) convert_to_RZ = True else: convert_to_RZ = False grad_r = (array.isel({"points":self.right_index}) \ - array.isel({"points":self.left_index})) \ * 0.5 / self.r_spacing # Apply correction at edge point to achieve first order forward stencil edge = np.logical_or(self.right_index==self.right_index.points, \ self.left_index==self.left_index.points) grad_r.loc[{"points":edge}] = grad_r.isel({"points":edge}) * 2.0 # If no norm attribute given, assume normalized units if "norm" in array.attrs: grad_r.attrs["norm"] = array.norm / self.R0 if convert_to_RZ: return self.vector_to_matrix(grad_r) else: return grad_r def _gradient_Z(self, array: xr.DataArray): """ Return the Z component of the perpendicular gradient. Base method to calculate the Z component of the perpendicular gradient, utilizing the neighbor index arrays created in the setup routine. Uses second order central differences in the interior and first order forward/backward differences at the boundaries. Works on structured or unstructured data. The latter is preferred for speed. """ if not self._vector_to_matrix_initialized: self.setup_vector_to_matrix() self.setup_filled_vector_to_matrix() self._vector_to_matrix_initialized = True if not self._perpendicular_gradient_initialized: self.setup_perpendicular_gradient() self._perpendicular_gradient_initialized = True if not "points" in array.dims: assert ("R" in array.dims and "Z" in array.dims), \ "Neither dimension points, R or Z were found in the array." array = self.matrix_to_vector(array) convert_to_RZ = True else: convert_to_RZ = False grad_z = (array.isel({"points":self.up_index}) \ - array.isel({"points":self.down_index})) \ * 0.5 / self.z_spacing edge = np.logical_or(self.up_index==self.up_index.points, \ self.down_index==self.down_index.points) grad_z.loc[{"points":edge}] = grad_z.isel({"points":edge}) * 2.0 # If no norm attribute given, assume normalized units if "norm" in array.attrs: grad_z.attrs["norm"] = array.norm / self.R0 if convert_to_RZ: return self.vector_to_matrix(grad_z) else: return grad_z
[docs] def get_unstructured_index(self, rval, zval): """Return index of r_u and z_u that correspond to rval and zval.""" ui = 0 smallest = 1e10 for i in range(len(self.r_u)): dist = np.sqrt((self.r_u[i] - rval)**2 \ + (self.z_u[i] - zval)**2) if dist < smallest: ui = i smallest = dist return ui
[docs] def coords_like(self, array: xr.DataArray): """Return (R,Z) coordinates that fit the dims of the given array.""" assert(isinstance(array, xr.DataArray)), \ "Array must be of type xarray DataArray." if("R" in array.dims and "Z" in array.dims): r_norm = self.r_s z_norm = self.z_s elif("points" in array.dims): r_norm = self.r_u z_norm = self.z_u else: r_norm = None z_norm = None return r_norm, z_norm
@njit def _stack(unstructured_data, filled, sort_indices, nan_vector, filled_point_index): """Return the inner shaping loop for vector_to_matrix.""" result = np.zeros(filled_point_index.shape) if not filled: result = (unstructured_data[sort_indices] + nan_vector).reshape( filled_point_index.shape ) else: for i in range(filled_point_index.shape[0]): for j in range(filled_point_index.shape[1]): result[i, j] = unstructured_data[filled_point_index[i, j]] return result @njit def _flatten(input_array, nan_vector, reverse_sort_indices): """Return the inner flatten loop for matrix_to_vector.""" return input_array.flatten()[np.logical_not(np.isnan(nan_vector))][ reverse_sort_indices ]