Source code for torx.grid.multigrid_operations_m

"""
Operators based on the low-level interface to the multigrid.

These have good performance, but are specific to GRILLIX only. You can
generally use them as a drop-in replacement.
"""
from numba import njit
import xarray as xr
import numpy as np
from scipy.interpolate import griddata
from torx.autodoc_decorators_m import autodoc_function

@autodoc_function
def calc_r_u(mesh_lvl: xr.DataArray) -> xr.DataArray:
    """Unstructured radial coordinate."""
    return mesh_lvl.xmin + (mesh_lvl.cart_i - 1) * mesh_lvl.spacing_f

@autodoc_function
def calc_z_u(mesh_lvl: xr.DataArray) -> xr.DataArray:
    """Unstructured vertical coordinate."""
    return mesh_lvl.ymin + (mesh_lvl.cart_j - 1) * mesh_lvl.spacing_f

@autodoc_function
def calc_r_s(mesh_lvl: xr.DataArray) -> xr.DataArray:
    """Structured radial coordinate."""
    return np.unique(calc_r_u(mesh_lvl))

@autodoc_function
def calc_z_s(mesh_lvl: xr.DataArray) -> xr.DataArray:
    """Structured vertical coordinate."""
    return np.unique(calc_z_u(mesh_lvl))

@autodoc_function
def vector_to_matrix(
    mesh_lvl: xr.DataArray,
    unstructured_data: xr.DataArray
) -> xr.DataArray:
    """Convert unstructured to structured representation."""
    @njit
    def shape(array_in, cart_i, cart_j, size_out, spacing_ratio):

        assert cart_i.size == cart_j.size == array_in.size

        n_points = cart_i.size
        out = np.zeros(size_out, dtype=array_in.dtype) * np.nan

        i_min, j_min = cart_i.min(), cart_j.min()

        for l in range(n_points):
            out[
                (cart_j[l] - j_min) // spacing_ratio,
                (cart_i[l] - i_min) // spacing_ratio,
            ] = array_in[l]

        return out

    r_s, z_s = calc_r_s(mesh_lvl), calc_z_s(mesh_lvl)

    with xr.set_options(keep_attrs=True):

        structured_data = xr.apply_ufunc(
            # First pass the function
            shape,
            # Next, pass the input array
            unstructured_data,
            # Pass in the function keyword arguments
            kwargs=dict(
                cart_i=mesh_lvl.cart_i.values,
                cart_j=mesh_lvl.cart_j.values,
                size_out=(z_s.size, r_s.size),
                spacing_ratio=int(mesh_lvl.spacing_c / mesh_lvl.spacing_f),
            ),
            # Provide a list of the 'core' dimensions, which are the ones that aren't looped over
            input_core_dims=[
                ["points"],
            ],
            # Provide a list of the output dimensions (which replaces input_core_dims)
            output_core_dims=[["Z", "R"]],
            # Loop over non-core dims
            vectorize=True,
            # Pass keywords to the dask core, to define the size of the returned array
            # Switch on dask parallelism. Works best if unstructured_data is generated using the dask-based methods
            dask="parallelized",
            # Declare the size of the output_core_dims
            dask_gufunc_kwargs={"output_sizes": {"Z": z_s.size, "R": r_s.size}},
            # Declare the type of the output
            output_dtypes=[np.float64],
        )

    return structured_data.assign_coords(R=r_s, Z=z_s)

@autodoc_function
def matrix_to_vector(
    mesh_lvl: xr.DataArray,
    structured_data: xr.DataArray
    ) -> xr.DataArray:
    """Convert structured to unstructured representation."""
    @njit
    def flatten(array_in, cart_i, cart_j, spacing_ratio):

        assert cart_i.size == cart_j.size

        i_min, j_min = cart_i.min(), cart_j.min()
        n_points = cart_i.size

        out = np.zeros(n_points) * np.nan

        for l in range(n_points):
            out[l] = array_in[
                (cart_j[l] - j_min) // spacing_ratio,
                (cart_i[l] - i_min) // spacing_ratio,
            ]

        return out

    with xr.set_options(keep_attrs=True):

        unstructured_data = xr.apply_ufunc(
            flatten,
            structured_data,
            kwargs=dict(
                cart_i=mesh_lvl.cart_i.values,
                cart_j=mesh_lvl.cart_j.values,
                spacing_ratio=int(mesh_lvl.spacing_c / mesh_lvl.spacing_f),
            ),
            input_core_dims=[["Z", "R"]],
            output_core_dims=[
                ["points"],
            ],
            vectorize=True,
            dask="parallelized",
            dask_gufunc_kwargs={"output_sizes": {"points": mesh_lvl.cart_i.size}},
            output_dtypes=[np.float64],
        )

    return unstructured_data

[docs] @autodoc_function def inner_to_full(mesh_lvl: xr.DataArray) -> xr.DataArray: """ Return a matrix of indices for extrapolating arrays from inner to full grid. Usage: full_array = inner_array.isel(inner_points=inner_to_full(mesh_lvl)) """ index_array = griddata( # Input mesh ( ( mesh_lvl.cart_i[mesh_lvl.inner_indices - 1], mesh_lvl.cart_j[mesh_lvl.inner_indices - 1], ) ), # Input values np.arange(mesh_lvl.inner_indices.size), # Output mesh (mesh_lvl.cart_i, mesh_lvl.cart_j), method="nearest", ).astype(int) return xr.DataArray(index_array, dims="points")
[docs] @autodoc_function def full_to_inner(mesh_lvl: xr.DataArray) -> xr.DataArray: """ Return a matrix of indices for extrapolating arrays from full to inner grid. Usage: inner_array = full_array.isel(points=full_to_inner(mesh_lvl)) """ # Use griddata to find the nearest full grid index for each inner grid point index_array = griddata( # Input mesh (full grid coordinates) (mesh_lvl.cart_i, mesh_lvl.cart_j), # Input values (full grid indices) np.arange(mesh_lvl.size), # Output mesh (inner grid coordinates) ( mesh_lvl.cart_i[mesh_lvl.inner_indices - 1], mesh_lvl.cart_j[mesh_lvl.inner_indices - 1], ), method="nearest", ).astype(int) # Flatten the index array to match the 1D structure of xarray selection return xr.DataArray(index_array, dims=["inner_points"])