"""Polygon sets forming Poincare structures."""
import numpy as np
import xarray as xr
from scipy.interpolate import LinearNDInterpolator, CloughTocher2DInterpolator
from torx.autodoc_decorators_m import autodoc_class
from torx.geometry.ordered_polygon_2d_m import OrderedPolygon2D
[docs]
@autodoc_class
class PoincarePolygonSet:
"""
Set of ordered polygon objects constructed from Poincare data.
Allows computing a radial coordinate based on the polygon areas.
"""
[docs]
def __init__(self, poincare_data: xr.DataArray, phi_array: xr.DataArray, start_idx: int=0,
k: int=20, n_interp: int=0, smooth: float=None):
"""
Initialize the set from Poincare data from PARALLAX.
The polygons are used to define flux surfaces and compute
a radial coordinate based on their average areas.
Parameters
----------
poincare_data : xarray.DataArray
4D array with dimensions (dim_surf, dim_turn, dim_plane, dim_xy)
containing the Poincare points from PARALLAX.
phi_array : xarray.DataArray
Array of toroidal angles corresponding to each plane.
start_idx : int, optional
Index of the first point to include.
k : int, optional
Number of nearest neighbors for polygon construction.
n_interp : int, optional
Number of points to interpolate along polygon edges.
smooth : float, optional
Smoothing parameter for polygon edges, by default None.
"""
assert(type(poincare_data) == xr.DataArray), \
"Poincare data must be Xarray DataArray."
required_dims = {"dim_surf", "dim_turn", "dim_plane", "dim_xy"}
assert required_dims.issubset(poincare_data.dims), \
f"Expected dims {required_dims}, got {poincare_data.dims}"
self.n_surfaces = poincare_data.sizes["dim_surf"]
self.n_turns = poincare_data.sizes["dim_turn"]
self.n_planes = poincare_data.sizes["dim_plane"]
assert(poincare_data.sizes["dim_xy"] == 2), \
"Poincare data must have dim_xy of size 2."
# Create OrderedPolygon2D objects from Poincare data
polygon_data = xr.apply_ufunc(
OrderedPolygon2D.from_2d_array,
poincare_data,
start_idx, k, n_interp, smooth,
input_core_dims=[['dim_turn', 'dim_xy'],[],[],[],[]],
output_core_dims=[[]],
vectorize=True,
output_dtypes=[object]
)
# Compute areas
polygon_area = xr.apply_ufunc(
lambda p: p.signed_area(),
polygon_data,
input_core_dims=[[]],
output_core_dims=[[]],
vectorize=True,
output_dtypes=[float]
)
# Mean area over planes
mean_area = polygon_area.mean(dim='dim_plane')
# Sort everything according to mean area
sort_indices = mean_area.argsort()
self.polygon_data = polygon_data.isel(dim_surf=sort_indices)
self.poincare_data = poincare_data.isel(dim_surf=sort_indices)
self.mean_area = mean_area.isel(dim_surf=sort_indices)
# Compute normalized radial coordinate rho
self.rho_data = self.mean_area / self.mean_area[-1]
self.phi_array = phi_array
self._rho_interpolators = None
[docs]
def __getitem__(self, key: int | tuple) -> object:
"""
Get item(s) from the polygon set using indices.
[i_plane, i_surf, i_point, i_coord].
"""
# Normalize key to tuple and check that it has 1-4 elements
if not isinstance(key, tuple):
key = (key,)
if not 1 <= len(key) <= 4:
raise IndexError(
"Indexing requires 1–4 indices: [i_plane, i_surf, i_point, i_coord]."
)
# Normalize the key to a 4-tuple (pad with slices as needed)
# Think of slice(None) as ":", it works as a "pass-through" operator
# The alternative would be to do many else if statements
key = key + (slice(None),) * (4 - len(key))
i_plane, i_surf, i_point, i_coord = key
sliced = self.polygon_data.isel(dim_plane=i_plane, dim_surf=i_surf)
# If fully indexed return a scalar
if all(isinstance(i, (int, np.integer)) for i in key):
return sliced.item()[i_point, i_coord]
# Determine output core dimensions based on key
output_core_dims = []
# If i_point is a slice, the first output dimension is 'dim_point'
if isinstance(i_point, slice):
output_core_dims.append('dim_turn')
# If i_coord is a slice, the second output dimension is 'dim_coord'
if isinstance(i_coord, slice):
output_core_dims.append('dim_xy')
def _extract_polygon_slice(poly: OrderedPolygon2D, i_point, i_coord):
if i_coord == slice(None):
return poly[i_point]
else:
return poly[i_point, i_coord]
# when there are slices, use apply_ufunc to extract the data
return xr.apply_ufunc(
_extract_polygon_slice,
sliced,
i_point,
i_coord,
input_core_dims=[[], [], []],
output_core_dims=[output_core_dims],
vectorize=True,
output_dtypes=[np.float64]
)
[docs]
@classmethod
def from_file(cls, filename:str, start_idx: int=0,
k: int=20, n_interp=0, smooth: float=None):
"""Initialize from a file produced with PARALLAX diagnose_poincare."""
poincare = xr.open_dataset(filename)
return cls(poincare.poincare_data, poincare.phi_array,
start_idx, k, n_interp, smooth)
[docs]
@classmethod
def from_torx_poincare(cls, starting_points, n_turns):
"""Initialize from torx field line tracing."""
# Placeholder - implement field line tracing
raise NotImplementedError("Field line tracing not implemented yet.")
@property
def rho_interp(self):
"""
Lazy-loaded interpolator for rho(R, Z, phi).
Returns an xarray of interpolators for each plane.
"""
if self._rho_interpolators is None:
# Create interpolators for each plane
def _plane_interpolator(poincare_data, phi, rho_data):
R_data = poincare_data[:,:,0].flatten()
Z_data = poincare_data[:,:,1].flatten()
rho_data = np.broadcast_to(rho_data[:, np.newaxis],
poincare_data[:,:,0].shape).flatten()
coords = np.column_stack([R_data, Z_data])
return CloughTocher2DInterpolator(coords, rho_data)
self._rho_interpolators = xr.apply_ufunc(
_plane_interpolator,
self.poincare_data,
self.phi_array,
self.rho_data,
# Separate along dim_plane
input_core_dims=[["dim_surf", "dim_turn", "dim_xy"], [], ["dim_surf"]],
output_core_dims=[[]],
vectorize=True,
output_dtypes=[object],
)
return self._rho_interpolators
[docs]
def rho(self, R, Z, phi):
"""Return interpolated rho(R, Z, phi)."""
# Find closest phi
phi_closest = abs(self.phi_array-phi).argmin(dim='dim_plane')
# Get interpolator for that plane
interp = self.rho_interp.isel(dim_plane=phi_closest).item()
# Return interpolated value
return interp(R, Z)
[docs]
def reset_interpolator(self):
"""Reset the interpolator."""
self._rho_interpolators = None