Source code for torx.geometry.poincare_polygon_set_m

"""Polygon sets forming Poincare structures."""
import numpy as np
import xarray as xr
from scipy.interpolate import LinearNDInterpolator, CloughTocher2DInterpolator
from torx.autodoc_decorators_m import autodoc_class
from torx.geometry.ordered_polygon_2d_m import OrderedPolygon2D

[docs] @autodoc_class class PoincarePolygonSet: """ Set of ordered polygon objects constructed from Poincare data. Allows computing a radial coordinate based on the polygon areas. """
[docs] def __init__(self, poincare_data: xr.DataArray, phi_array: xr.DataArray, start_idx: int=0, k: int=20, n_interp: int=0, smooth: float=None): """ Initialize the set from Poincare data from PARALLAX. The polygons are used to define flux surfaces and compute a radial coordinate based on their average areas. Parameters ---------- poincare_data : xarray.DataArray 4D array with dimensions (dim_surf, dim_turn, dim_plane, dim_xy) containing the Poincare points from PARALLAX. phi_array : xarray.DataArray Array of toroidal angles corresponding to each plane. start_idx : int, optional Index of the first point to include. k : int, optional Number of nearest neighbors for polygon construction. n_interp : int, optional Number of points to interpolate along polygon edges. smooth : float, optional Smoothing parameter for polygon edges, by default None. """ assert(type(poincare_data) == xr.DataArray), \ "Poincare data must be Xarray DataArray." required_dims = {"dim_surf", "dim_turn", "dim_plane", "dim_xy"} assert required_dims.issubset(poincare_data.dims), \ f"Expected dims {required_dims}, got {poincare_data.dims}" self.n_surfaces = poincare_data.sizes["dim_surf"] self.n_turns = poincare_data.sizes["dim_turn"] self.n_planes = poincare_data.sizes["dim_plane"] assert(poincare_data.sizes["dim_xy"] == 2), \ "Poincare data must have dim_xy of size 2." # Create OrderedPolygon2D objects from Poincare data polygon_data = xr.apply_ufunc( OrderedPolygon2D.from_2d_array, poincare_data, start_idx, k, n_interp, smooth, input_core_dims=[['dim_turn', 'dim_xy'],[],[],[],[]], output_core_dims=[[]], vectorize=True, output_dtypes=[object] ) # Compute areas polygon_area = xr.apply_ufunc( lambda p: p.signed_area(), polygon_data, input_core_dims=[[]], output_core_dims=[[]], vectorize=True, output_dtypes=[float] ) # Mean area over planes mean_area = polygon_area.mean(dim='dim_plane') # Sort everything according to mean area sort_indices = mean_area.argsort() self.polygon_data = polygon_data.isel(dim_surf=sort_indices) self.poincare_data = poincare_data.isel(dim_surf=sort_indices) self.mean_area = mean_area.isel(dim_surf=sort_indices) # Compute normalized radial coordinate rho self.rho_data = self.mean_area / self.mean_area[-1] self.phi_array = phi_array self._rho_interpolators = None
[docs] def __getitem__(self, key: int | tuple) -> object: """ Get item(s) from the polygon set using indices. [i_plane, i_surf, i_point, i_coord]. """ # Normalize key to tuple and check that it has 1-4 elements if not isinstance(key, tuple): key = (key,) if not 1 <= len(key) <= 4: raise IndexError( "Indexing requires 1–4 indices: [i_plane, i_surf, i_point, i_coord]." ) # Normalize the key to a 4-tuple (pad with slices as needed) # Think of slice(None) as ":", it works as a "pass-through" operator # The alternative would be to do many else if statements key = key + (slice(None),) * (4 - len(key)) i_plane, i_surf, i_point, i_coord = key sliced = self.polygon_data.isel(dim_plane=i_plane, dim_surf=i_surf) # If fully indexed return a scalar if all(isinstance(i, (int, np.integer)) for i in key): return sliced.item()[i_point, i_coord] # Determine output core dimensions based on key output_core_dims = [] # If i_point is a slice, the first output dimension is 'dim_point' if isinstance(i_point, slice): output_core_dims.append('dim_turn') # If i_coord is a slice, the second output dimension is 'dim_coord' if isinstance(i_coord, slice): output_core_dims.append('dim_xy') def _extract_polygon_slice(poly: OrderedPolygon2D, i_point, i_coord): if i_coord == slice(None): return poly[i_point] else: return poly[i_point, i_coord] # when there are slices, use apply_ufunc to extract the data return xr.apply_ufunc( _extract_polygon_slice, sliced, i_point, i_coord, input_core_dims=[[], [], []], output_core_dims=[output_core_dims], vectorize=True, output_dtypes=[np.float64] )
[docs] @classmethod def from_file(cls, filename:str, start_idx: int=0, k: int=20, n_interp=0, smooth: float=None): """Initialize from a file produced with PARALLAX diagnose_poincare.""" poincare = xr.open_dataset(filename) return cls(poincare.poincare_data, poincare.phi_array, start_idx, k, n_interp, smooth)
[docs] @classmethod def from_torx_poincare(cls, starting_points, n_turns): """Initialize from torx field line tracing.""" # Placeholder - implement field line tracing raise NotImplementedError("Field line tracing not implemented yet.")
@property def rho_interp(self): """ Lazy-loaded interpolator for rho(R, Z, phi). Returns an xarray of interpolators for each plane. """ if self._rho_interpolators is None: # Create interpolators for each plane def _plane_interpolator(poincare_data, phi, rho_data): R_data = poincare_data[:,:,0].flatten() Z_data = poincare_data[:,:,1].flatten() rho_data = np.broadcast_to(rho_data[:, np.newaxis], poincare_data[:,:,0].shape).flatten() coords = np.column_stack([R_data, Z_data]) return CloughTocher2DInterpolator(coords, rho_data) self._rho_interpolators = xr.apply_ufunc( _plane_interpolator, self.poincare_data, self.phi_array, self.rho_data, # Separate along dim_plane input_core_dims=[["dim_surf", "dim_turn", "dim_xy"], [], ["dim_surf"]], output_core_dims=[[]], vectorize=True, output_dtypes=[object], ) return self._rho_interpolators
[docs] def rho(self, R, Z, phi): """Return interpolated rho(R, Z, phi).""" # Find closest phi phi_closest = abs(self.phi_array-phi).argmin(dim='dim_plane') # Get interpolator for that plane interp = self.rho_interp.isel(dim_plane=phi_closest).item() # Return interpolated value return interp(R, Z)
[docs] def reset_interpolator(self): """Reset the interpolator.""" self._rho_interpolators = None