"""Defines routines for calculating integral quantities."""
import numpy as np
import xarray as xr
from scipy.interpolate import RectBivariateSpline
from torx import make_xarray
from torx import Quantity
from torx.grid import Grid2D, Grid3D
from torx.autodoc_decorators_m import autodoc_function
from torx.normalization.normalization_m import Normalization
from typing import Callable, Sequence
def poloidal_integration(
grid: Grid2D, norm: Normalization, poloidal_function: xr.DataArray
):
"""
Return the 2D poloidal integral of a function.
Off-grid values are assigned a value of 0.0, and a filled grid spline
interpolator is used to evaluate the integral.
"""
assert hasattr(poloidal_function, "norm")
assert isinstance(poloidal_function.norm, Quantity)
grid_limits = {
"xa": grid.r_s.min(),
"xb": grid.r_s.max(),
"ya": grid.z_s.min(),
"yb": grid.z_s.max(),
}
poloidal_function_values = np.nan_to_num(poloidal_function.values, nan=0.0)
function_integral = RectBivariateSpline(
grid.r_s, grid.z_s, poloidal_function_values.T
).integral(**grid_limits)
norm = poloidal_function.norm * norm.R0**2
return function_integral * norm
def axisymmetric_cylindrical_integration(
grid: Grid2D, norm: Normalization, poloidal_function: xr.DataArray
):
"""
Return the 3D cylindrically-integrated value of a function.
Note
----
Automatically takes care of the "R" factor introduced by the theta
integral.
"""
r_s_grid, _ = np.meshgrid(grid.r_s, grid.z_s)
poloidal_integration_function = make_xarray(
poloidal_function * r_s_grid, norm=poloidal_function.norm
)
function_integral = (
2.0
* np.pi
* norm.R0
* poloidal_integration(grid, norm, poloidal_integration_function)
)
return function_integral
[docs]
@autodoc_function
def flux_surf_avg_3D(
grid: Grid3D,
vol: xr.DataArray,
field: xr.DataArray,
rho_func: Callable,
rho_array: Sequence,
drho: float = 1e-4,
):
"""
Return the flux-surface average of a given field on a 3D grid.
It integrates the field over an infinitesimal volume shell centered on the flux surface [1].
[1] A. Stegmeir et al., CPC 2026, Appendix C.
Parameters
----------
grid : Grid3D
The 3D grid in which the field is defined.
vol : xr.DataArray
The fluxbox volume at each point of the grid (can be obtained from the maps).
field : xr.DataArray
The field to be averaged. Must have dimensions compatible with the grid.
rho_func : Callable
A function that computes the flux surface label (rho) from R, Z, phi coordinates.
rho_array : Sequence
The array of rho values at which to compute the flux-surface average
drho : float, optional
The small increment in rho used for numerical differentiation, by default 1e-4.
Returns
-------
xr.DataArray
The flux-surface averaged field.
"""
R = grid.r_u
Z = grid.z_u
phi = grid.coords["phi"]
rho_array = xr.DataArray(rho_array, dims="rho")
# Check if the desired dimension name to be replaced exists
if 'planes' in field.dims:
field = field.rename({'planes': 'phi'})
if 'npoints_max' in field.dims:
field = field.rename({'npoints_max': 'points'})
# Compute rho field
rho_values = xr.apply_ufunc(
rho_func,
R,
Z,
phi,
input_core_dims=[["points"], ["points"], []],
output_core_dims=[["points"]],
vectorize=True,
dask='parallelized',
output_dtypes=[np.float64]
).transpose()
# Define a small function to compute the flux-surface average for one rho
def _fsa_for_rho(r, rho_values, field, vol, drho):
mask = (rho_values >= r - drho / 2) & (rho_values < r + drho / 2)
if mask.sum() == 0:
raise ValueError(f"No points found in the rho range [{r - drho / 2}, {r + drho / 2}]")
return (vol[mask]*field[mask]).sum() / vol[mask].sum()
# Use apply_ufunc to vectorize over rho_array
fsa = xr.apply_ufunc(
_fsa_for_rho,
rho_array,
rho_values,
field,
vol,
drho,
input_core_dims=[[], ["points", "phi"], ["points", "phi"], ["points", "phi"], []],
output_core_dims=[[]],
vectorize=True,
dask='parallelized',
output_dtypes=[np.float64]
).assign_coords(rho=rho_array)
return fsa