"""
Operators based on the low-level interface to the multigrid.
These have good performance, but are specific to GRILLIX only. You can
generally use them as a drop-in replacement.
"""
from numba import njit
import xarray as xr
import numpy as np
from scipy.interpolate import griddata
from torx.autodoc_decorators_m import autodoc_function
@autodoc_function
def calc_r_u(mesh_lvl: xr.DataArray) -> xr.DataArray:
"""Unstructured radial coordinate."""
return mesh_lvl.xmin + (mesh_lvl.cart_i - 1) * mesh_lvl.spacing_f
@autodoc_function
def calc_z_u(mesh_lvl: xr.DataArray) -> xr.DataArray:
"""Unstructured vertical coordinate."""
return mesh_lvl.ymin + (mesh_lvl.cart_j - 1) * mesh_lvl.spacing_f
@autodoc_function
def calc_r_s(mesh_lvl: xr.DataArray) -> xr.DataArray:
"""Structured radial coordinate."""
return np.unique(calc_r_u(mesh_lvl))
@autodoc_function
def calc_z_s(mesh_lvl: xr.DataArray) -> xr.DataArray:
"""Structured vertical coordinate."""
return np.unique(calc_z_u(mesh_lvl))
@autodoc_function
def vector_to_matrix(
mesh_lvl: xr.DataArray,
unstructured_data: xr.DataArray
) -> xr.DataArray:
"""Convert unstructured to structured representation."""
@njit
def shape(array_in, cart_i, cart_j, size_out, spacing_ratio):
assert cart_i.size == cart_j.size == array_in.size
n_points = cart_i.size
out = np.zeros(size_out, dtype=array_in.dtype) * np.nan
i_min, j_min = cart_i.min(), cart_j.min()
for l in range(n_points):
out[
(cart_j[l] - j_min) // spacing_ratio,
(cart_i[l] - i_min) // spacing_ratio,
] = array_in[l]
return out
r_s, z_s = calc_r_s(mesh_lvl), calc_z_s(mesh_lvl)
with xr.set_options(keep_attrs=True):
structured_data = xr.apply_ufunc(
# First pass the function
shape,
# Next, pass the input array
unstructured_data,
# Pass in the function keyword arguments
kwargs=dict(
cart_i=mesh_lvl.cart_i.values,
cart_j=mesh_lvl.cart_j.values,
size_out=(z_s.size, r_s.size),
spacing_ratio=int(mesh_lvl.spacing_c / mesh_lvl.spacing_f),
),
# Provide a list of the 'core' dimensions, which are the ones that aren't looped over
input_core_dims=[
["points"],
],
# Provide a list of the output dimensions (which replaces input_core_dims)
output_core_dims=[["Z", "R"]],
# Loop over non-core dims
vectorize=True,
# Pass keywords to the dask core, to define the size of the returned array
# Switch on dask parallelism. Works best if unstructured_data is generated using the dask-based methods
dask="parallelized",
# Declare the size of the output_core_dims
dask_gufunc_kwargs={"output_sizes": {"Z": z_s.size, "R": r_s.size}},
# Declare the type of the output
output_dtypes=[np.float64],
)
return structured_data.assign_coords(R=r_s, Z=z_s)
@autodoc_function
def matrix_to_vector(
mesh_lvl: xr.DataArray,
structured_data: xr.DataArray
) -> xr.DataArray:
"""Convert structured to unstructured representation."""
@njit
def flatten(array_in, cart_i, cart_j, spacing_ratio):
assert cart_i.size == cart_j.size
i_min, j_min = cart_i.min(), cart_j.min()
n_points = cart_i.size
out = np.zeros(n_points) * np.nan
for l in range(n_points):
out[l] = array_in[
(cart_j[l] - j_min) // spacing_ratio,
(cart_i[l] - i_min) // spacing_ratio,
]
return out
with xr.set_options(keep_attrs=True):
unstructured_data = xr.apply_ufunc(
flatten,
structured_data,
kwargs=dict(
cart_i=mesh_lvl.cart_i.values,
cart_j=mesh_lvl.cart_j.values,
spacing_ratio=int(mesh_lvl.spacing_c / mesh_lvl.spacing_f),
),
input_core_dims=[["Z", "R"]],
output_core_dims=[
["points"],
],
vectorize=True,
dask="parallelized",
dask_gufunc_kwargs={"output_sizes": {"points": mesh_lvl.cart_i.size}},
output_dtypes=[np.float64],
)
return unstructured_data
[docs]
@autodoc_function
def inner_to_full(mesh_lvl: xr.DataArray) -> xr.DataArray:
"""
Return a matrix of indices for extrapolating arrays from inner to full grid.
Usage:
full_array = inner_array.isel(inner_points=inner_to_full(mesh_lvl))
"""
index_array = griddata(
# Input mesh
(
(
mesh_lvl.cart_i[mesh_lvl.inner_indices - 1],
mesh_lvl.cart_j[mesh_lvl.inner_indices - 1],
)
),
# Input values
np.arange(mesh_lvl.inner_indices.size),
# Output mesh
(mesh_lvl.cart_i, mesh_lvl.cart_j),
method="nearest",
).astype(int)
return xr.DataArray(index_array, dims="points")
[docs]
@autodoc_function
def full_to_inner(mesh_lvl: xr.DataArray) -> xr.DataArray:
"""
Return a matrix of indices for extrapolating arrays from full to inner grid.
Usage:
inner_array = full_array.isel(points=full_to_inner(mesh_lvl))
"""
# Use griddata to find the nearest full grid index for each inner grid point
index_array = griddata(
# Input mesh (full grid coordinates)
(mesh_lvl.cart_i, mesh_lvl.cart_j),
# Input values (full grid indices)
np.arange(mesh_lvl.size),
# Output mesh (inner grid coordinates)
(
mesh_lvl.cart_i[mesh_lvl.inner_indices - 1],
mesh_lvl.cart_j[mesh_lvl.inner_indices - 1],
),
method="nearest",
).astype(int)
# Flatten the index array to match the 1D structure of xarray selection
return xr.DataArray(index_array, dims=["inner_points"])