Source code for torx.analysis.integrations_m

"""Defines routines for calculating integral quantities."""
import numpy as np
import xarray as xr
from scipy.interpolate import RectBivariateSpline
from torx import make_xarray
from torx import Quantity
from torx.grid import Grid2D, Grid3D
from torx.autodoc_decorators_m import autodoc_function
from torx.normalization.normalization_m import Normalization
from typing import Callable, Sequence

def poloidal_integration(
    grid: Grid2D, norm: Normalization, poloidal_function: xr.DataArray
):
    """
    Return the 2D poloidal integral of a function.

    Off-grid values are assigned a value of 0.0, and a filled grid spline
    interpolator is used to evaluate the integral.
    """
    assert hasattr(poloidal_function, "norm")
    assert isinstance(poloidal_function.norm, Quantity)

    grid_limits = {
        "xa": grid.r_s.min(),
        "xb": grid.r_s.max(),
        "ya": grid.z_s.min(),
        "yb": grid.z_s.max(),
    }

    poloidal_function_values = np.nan_to_num(poloidal_function.values, nan=0.0)

    function_integral = RectBivariateSpline(
        grid.r_s, grid.z_s, poloidal_function_values.T
    ).integral(**grid_limits)

    norm = poloidal_function.norm * norm.R0**2

    return function_integral * norm


def axisymmetric_cylindrical_integration(
    grid: Grid2D, norm: Normalization, poloidal_function: xr.DataArray
):
    """
    Return the 3D cylindrically-integrated value of a function.

    Note
    ----
    Automatically takes care of the "R" factor introduced by the theta
    integral.
    """
    r_s_grid, _ = np.meshgrid(grid.r_s, grid.z_s)
    poloidal_integration_function = make_xarray(
        poloidal_function * r_s_grid, norm=poloidal_function.norm
    )
    function_integral = (
        2.0
        * np.pi
        * norm.R0
        * poloidal_integration(grid, norm, poloidal_integration_function)
    )

    return function_integral

[docs] @autodoc_function def flux_surf_avg_3D( grid: Grid3D, vol: xr.DataArray, field: xr.DataArray, rho_func: Callable, rho_array: Sequence, drho: float = 1e-4, ): """ Return the flux-surface average of a given field on a 3D grid. It integrates the field over an infinitesimal volume shell centered on the flux surface [1]. [1] A. Stegmeir et al., CPC 2026, Appendix C. Parameters ---------- grid : Grid3D The 3D grid in which the field is defined. vol : xr.DataArray The fluxbox volume at each point of the grid (can be obtained from the maps). field : xr.DataArray The field to be averaged. Must have dimensions compatible with the grid. rho_func : Callable A function that computes the flux surface label (rho) from R, Z, phi coordinates. rho_array : Sequence The array of rho values at which to compute the flux-surface average drho : float, optional The small increment in rho used for numerical differentiation, by default 1e-4. Returns ------- xr.DataArray The flux-surface averaged field. """ R = grid.r_u Z = grid.z_u phi = grid.coords["phi"] rho_array = xr.DataArray(rho_array, dims="rho") # Check if the desired dimension name to be replaced exists if 'planes' in field.dims: field = field.rename({'planes': 'phi'}) if 'npoints_max' in field.dims: field = field.rename({'npoints_max': 'points'}) # Compute rho field rho_values = xr.apply_ufunc( rho_func, R, Z, phi, input_core_dims=[["points"], ["points"], []], output_core_dims=[["points"]], vectorize=True, dask='parallelized', output_dtypes=[np.float64] ).transpose() # Define a small function to compute the flux-surface average for one rho def _fsa_for_rho(r, rho_values, field, vol, drho): mask = (rho_values >= r - drho / 2) & (rho_values < r + drho / 2) if mask.sum() == 0: raise ValueError(f"No points found in the rho range [{r - drho / 2}, {r + drho / 2}]") return (vol[mask]*field[mask]).sum() / vol[mask].sum() # Use apply_ufunc to vectorize over rho_array fsa = xr.apply_ufunc( _fsa_for_rho, rho_array, rho_values, field, vol, drho, input_core_dims=[[], ["points", "phi"], ["points", "phi"], ["points", "phi"], []], output_core_dims=[[]], vectorize=True, dask='parallelized', output_dtypes=[np.float64] ).assign_coords(rho=rho_array) return fsa