Source code for torx.equilibrium.numerical_m

"""Contains numerical tokamak equilibrium."""
import numpy as np
from pathlib import Path
import xarray as xr
from xarray import Dataset
from scipy.interpolate import RectBivariateSpline, interp1d
from collections import defaultdict
import warnings
import dask

from .equilibrium_m import EquilibriumBaseClass

from torx import Quantity
from torx.units_handling_m import check_units
from torx import make_xarray, convert_xarray_to_quantity
from torx.autogrid_decorators_m import autogrid_method
from torx.geometry.polygon_2d_m import Polygon2D
from torx.autodoc_decorators_m import autodoc_class
from torx.analysis.magnetic_field_tracer_m import MagneticFieldTracer

[docs] @autodoc_class class NumericalEquilibrium(EquilibriumBaseClass): """Equilibrium with a divertor defined by numerical poloidal flux."""
[docs] @classmethod def initialize_from_params(cls, filepath: Path, params: defaultdict): """Create an equilibrium from a parameter file.""" path_to_netcdf = Path(params["equi_numerical_params"]["path_to_netcdf"]) equilibrium_case = params["equi_numerical_params"]["equilibrium_case"] equi_file = Path(str(filepath / path_to_netcdf / equilibrium_case) + ".nc") assert ( equi_file.exists() ), f"Error: equilibrium file {equi_file.absolute()} not found" return cls.initialize_from_equi_file(equi_file)
[docs] @classmethod def initialize_from_equi_file(cls, equi_file: Path): """Create an equilibrium from an equilibrium file.""" assert equi_file.exists() and equi_file.suffix == ".nc" magnetic_geometry = xr.open_dataset(equi_file, group="Magnetic_geometry") psi_limits = xr.open_dataset(equi_file, group="Psi_limits") # Add "psi_separatrix" to psi_limits if legacy spelling was used in # provided file if "psi_seperatrix" in psi_limits.attrs: psi_limits.attrs["psi_separatrix"] = psi_limits.psi_seperatrix divertor_polygon = xr.open_dataset(equi_file, group="divertor_polygon") exclusion_polygon = xr.open_dataset(equi_file, group="exclusion_polygon") return cls(magnetic_geometry, psi_limits, divertor_polygon, equi_file, exclusion_polygon)
[docs] def __init__( self, magnetic_geometry: Dataset, psi_limits: Dataset, divertor_polygon: Dataset, equi_file: Path, exclusion_polygon: Dataset=None, B0=Quantity("1T") ): """ Initialize the equilibrium with datasets from equilibrium file. Requires the "Magnetic_geometry" and "Psi_limits" groups from an equilibrium NetCDF. Additionally, a norm object can be given which determines the normalization of the lengths and magnetic fields. Note ---- The equilibrium file is assumed to provide length and flux data in normalized units. The normalization is assumed to be w.r.t. the radial location of the magnetic axis and the axis field. B0 will determine the normalization used in the magnetic field evaluation. The normalization of length is fixed to the axis location. """ self.B0 = B0 self.filepath = equi_file self.magnetic_geometry = magnetic_geometry self.psi_limits = psi_limits # Add "psi_separatrix" to psi_limits if legacy spelling was used in # provided file if "psi_seperatrix" in self.psi_limits.attrs: self.psi_limits.attrs["psi_separatrix"] = self.psi_limits.psi_seperatrix self.poloidal_field_factor = +1.0 self._flipped_Z = False self.axis_r = make_xarray(magnetic_geometry.magnetic_axis_R, norm=Quantity(magnetic_geometry.magnetic_axis_units)) self.axis_z = make_xarray(magnetic_geometry.magnetic_axis_Z, norm=Quantity(magnetic_geometry.magnetic_axis_units)) # Per definition the normalization of the length is the axis R location self._norm_length = Quantity(magnetic_geometry.magnetic_axis_R, magnetic_geometry.magnetic_axis_units) try: self.x_point_r = make_xarray(magnetic_geometry.x_point_R, norm=Quantity(magnetic_geometry.x_point_units)) self.x_point_z = make_xarray(magnetic_geometry.x_point_Z, norm=Quantity(magnetic_geometry.x_point_units)) except AttributeError: self.x_point_r = None self.x_point_z = None print( "No X-point data available in the NetCDF. " + "Consider updating the equilibrium file with " + "the latest version of parallax_equilibrium" ) self.field_on_axis = make_xarray( magnetic_geometry.axis_Btor, norm=Quantity(1, magnetic_geometry.axis_Btor_units), ) # Read the basis vectors for the spline (in normalized units) r_base_norm = Quantity(magnetic_geometry.magnetic_axis_R, magnetic_geometry.magnetic_axis_units) self.spline_basis_r = make_xarray(magnetic_geometry.R, norm=r_base_norm) self.spline_basis_z = make_xarray(magnetic_geometry.Z, norm=r_base_norm) # Read the psi data (in Weber) self.psi_data = make_xarray( self.magnetic_geometry.psi, norm=Quantity(1, self.psi_units) ) # Note that the axis ordering is inverted relative to the output of # meshgrid. self.psi_interpolator = RectBivariateSpline( self.spline_basis_r, self.spline_basis_z, self.psi_data.T ) # Save the divertor_polygon self.divertor_polygon = Polygon2D( divertor_polygon["R_points"], divertor_polygon["Z_points"], invert_polygon=divertor_polygon.invert, ) if exclusion_polygon is not None: self.exclusion_polygon = Polygon2D( exclusion_polygon["R_points"], exclusion_polygon["Z_points"], invert_polygon=exclusion_polygon.invert, ) else: self.exclusion_polygon = None
[docs] def flip_Z(self, preserve_helicity: bool = True): """ Flip the vertical direction of the equilibrium. Affects toroidal field reversal. """ self._flipped_Z = not self._flipped_Z if preserve_helicity: self.poloidal_field_factor *= -1.0 self.axis_z *= -1.0 if self.x_point_z is not None: self.x_point_z *= -1.0 self.spline_basis_z = -self.spline_basis_z[::-1] self.psi_data = self.psi_data[::-1, :] self.psi_interpolator = RectBivariateSpline( self.spline_basis_r, self.spline_basis_z, self.psi_data.T ) self.divertor_polygon.flip_Z() if self.exclusion_polygon is not None: self.exclusion_polygon.flip_Z()
@property def poloidal_flux_on_axis(self): """Poloidal flux quantity on axis.""" return make_xarray(self.psi_limits.psi_axis, norm=Quantity(1, self.psi_units)) @property def poloidal_flux_on_separatrix(self): """Poloidal flux quantity on separatrix.""" return make_xarray( self.psi_limits.psi_separatrix, norm=Quantity(1, self.psi_units) ) @property def psi_units(self): """Units of the poloidal flux.""" try: return self.magnetic_geometry.psi_units except AttributeError: return self.magnetic_geometry.psi.units @property def axis_r_norm(self): """Magnetic axis radial position in normalized units.""" return make_xarray(1.0) @property def axis_z_norm(self): """Magnetic axis vertical position in normalized units.""" return make_xarray(self.axis_z / self.axis_r) @property def x_point_r_norm(self): """Magnetic X-point radial position in normalized units.""" return make_xarray(self.x_point_r / self.axis_r) @property def x_point_z_norm(self): """Magnetic X-point vertical position in normalized units.""" return make_xarray(self.x_point_z / self.axis_r) @property def axis_btor(self): """Returns the on-axis field as a quantity.""" return convert_xarray_to_quantity(self.field_on_axis) @property def B0(self) -> Quantity: """Magnetic field normalization.""" return self._norm_magfield @B0.setter def B0(self, value: Quantity): """Set the magnetic field normalization.""" check_units(value, {"[mass]":1, "[time]":-2, "[current]":-1}, "B0") self._norm_magfield = value @property def R0(self) -> Quantity: """Length normalization.""" return self._norm_length @R0.setter def R0(self, value: Quantity): """Set the length normalization.""" check_units(value, {"[length]":1}, "R0") self._norm_length = value
[docs] @autogrid_method def poloidal_flux(self, r_norm, z_norm, **kwargs): """Return the poloidal flux at a point.""" r_norm = self.convert_length_to_normalized(r_norm) z_norm = self.convert_length_to_normalized(z_norm) is_structured = kwargs["is_structured"] psi = make_xarray( self.psi_interpolator(x=r_norm, y=z_norm, grid=is_structured).T, norm=Quantity(1, self.psi_units), dims=kwargs["dims"], coords=kwargs["coords"] ) return psi
[docs] @autogrid_method def normalized_flux_surface_label(self, r_norm, z_norm, **kwargs) \ -> xr.DataArray: """Return the normalized flux surface label at a point.""" r_norm = self.convert_length_to_normalized(r_norm) z_norm = self.convert_length_to_normalized(z_norm) rho = self._poloidal_flux_to_rho(self.poloidal_flux(r_norm, z_norm)) return xr.DataArray(rho, attrs={"norm":Quantity(1)}, dims=kwargs["dims"], coords=kwargs["coords"])
def _poloidal_flux_to_rho(self, psi_value): """Convert poloidal flux (psi) to normalized flux coordinate (rho).""" psi_ax = self.poloidal_flux_on_axis psi_sep = self.poloidal_flux_on_separatrix rho_squared = (psi_value - psi_ax) / (psi_sep - psi_ax) rho_squared = np.atleast_1d(rho_squared) # NOTE: Because the evaluation precision of the poloidal flux might be # limited, it is not guaranteed that the flux at the axis (which # is stored at init) is larger than the evaluated flux at axis. # Thus we set a floor of zero for rho squared. rho_squared[np.asarray(rho_squared < 0.0)] = 0.0 if(np.size(rho_squared) == 1): rho_squared = rho_squared[0] return np.sqrt(rho_squared)
[docs] @autogrid_method def magfield_component_r(self, r_norm, z_norm, **kwargs): """Return the radial (R) magnetic field at a point.""" is_structured = kwargs["is_structured"] r_norm = self.convert_length_to_normalized(r_norm) z_norm = self.convert_length_to_normalized(z_norm) # Evaluate the poloidal flux, taking the 0th derivative in x (R) # and the 1st derivative in y(Z). Then calculate the magnetic field # according to eq. 2.25 of [1]. # [1] H. Zohm, MHD stability of Tokamaks, p24 psi_dz = xr.DataArray( self.psi_interpolator(x=r_norm, y=z_norm, dx=0, dy=1, grid=is_structured).T, dims=kwargs["dims"], coords=kwargs["coords"] ) # NOTE: In the normalization, one factor of R0 comes from the gradient # of psi and one from the normalized R used in the formula. norm = self.axis_r**2 if not np.logical_and( isinstance(r_norm, xr.DataArray), isinstance(z_norm, xr.DataArray) ): norm = norm.values b_r = psi_dz / (2 * np.pi * r_norm * norm) b_r = -self.poloidal_field_factor * b_r return xr.DataArray(b_r / self.B0.magnitude, attrs={"norm":self.B0}, dims=kwargs["dims"], coords=kwargs["coords"])
[docs] @autogrid_method def magfield_component_z(self, r_norm, z_norm, **kwargs): """Return the vertical (Z) magnetic field at a point.""" is_structured = kwargs["is_structured"] r_norm = self.convert_length_to_normalized(r_norm) z_norm = self.convert_length_to_normalized(z_norm) # Evaluate the poloidal flux, taking the 1st derivative in x (R) # and the 0th derivative in y(Z). Then calculate the magnetic field # according to eq. 2.24 of [1]. # [1] H. Zohm, MHD stability of Tokamaks, p24 psi_dr = xr.DataArray( self.psi_interpolator(x=r_norm, y=z_norm, dx=1, dy=0, grid=is_structured).T, dims=kwargs["dims"], coords=kwargs["coords"] ) norm = self.axis_r**2 if not np.logical_and( isinstance(r_norm, xr.DataArray), isinstance(z_norm, xr.DataArray) ): norm = norm.values b_z = psi_dr / (2 * np.pi * r_norm * norm) b_z = self.poloidal_field_factor * b_z return xr.DataArray(b_z / self.B0.magnitude, attrs={"norm":self.B0}, dims=kwargs["dims"], coords=kwargs["coords"])
[docs] @autogrid_method def magfield_component_toroidal(self, r_norm, z_norm, **kwargs): """Return the toroidal (Phi) magnetic field at a point.""" if kwargs["is_structured"]: r_mesh, _ = np.meshgrid(r_norm, z_norm) else: r_mesh = r_norm r_mesh = self.convert_length_to_normalized(r_mesh) return xr.DataArray(1.0 / r_mesh * (self.axis_btor / self.B0).m, attrs={"norm":self.B0}, dims=kwargs["dims"], coords=kwargs["coords"])
[docs] def get_boundary_polygon(self): """Return the divertor polygon.""" return self.divertor_polygon
[docs] def get_limiter_polygon(self, quantile=0.999, npoints=1000, tolerance=1e-3): """ Return the limiter polygon. Defined as the last closed non-separatrix surface. With the quantile argument, one can set how close in psi the limiter should be located. """ # NOTE: We currently ignore flux surface wander integrator = MagneticFieldTracer(self, max_flux_surface_wander=1.0) R_0 = np.array(self.axis_r_norm) Z_0 = np.array(self.axis_z_norm) R_max = np.max(self.get_boundary_polygon().x_points) R_u = np.linspace(R_0, R_max, npoints) psi_u = self.psi_interpolator(x=R_u, y=Z_0, dx=0, dy=0, grid=False) psi_start = self.psi_limits.psi_axis \ + (self.psi_limits.psi_separatrix \ - self.psi_limits.psi_axis) * quantile R_start = interp1d(psi_u, R_u, kind="linear")(psi_start) poloidal_trace = integrator.poloidal_integration( r_initial=float(R_start), z_initial=float(Z_0), theta_max=2.0*np.pi, tolerance=tolerance) theta = np.linspace(0.0, 2.0*np.pi, npoints, endpoint=False) R, Z, _ = poloidal_trace.sol(theta) limiter = Polygon2D(R, Z) return limiter
[docs] def safety_factor(self, rloc, zloc, tolerance=1e-3, return_sol=False): """ Return the safety factor calculated at the given location. Uses a poloidal field line trace. Uses parallelization to speed up the computations. Optionally returns ode solution. """ rloc = np.atleast_1d(rloc) zloc = np.atleast_1d(zloc) assert rloc.size == zloc.size, "R and Z arrays must have same size!" # NOTE: We currently ignore flux surface wander integrator = MagneticFieldTracer(self, max_flux_surface_wander=1.0) q_factor = [self._single_safety_factor(integrator, r, z, tolerance, return_sol=return_sol) \ for r, z in zip(rloc, zloc)] q_factor = dask.compute(*q_factor, scheduler="processes") # NOTE: The dask call returns tuples or tuples of tuples depending on # the number of outputs of the safety factor function. These need # to be converted to simpler structures if return_sol: q_factor, sol = list(zip(*q_factor)) return np.array(q_factor), list(sol) else: return np.array(q_factor)
@dask.delayed def _single_safety_factor(self, integrator, r, z, tolerance=1e-3, return_sol=False): """ Calculate the safety factor for a single location. Optionally returns the ode solution. """ if self.normalized_flux_surface_label(r, z) >= 1.0: return np.nan poloidal_trace = integrator.poloidal_integration( r_initial=r, z_initial=z, theta_max=2.0*np.pi, tolerance=tolerance) theta_sym_max = poloidal_trace.sol(2.0 * np.pi)[2] q_factor = theta_sym_max / (2.0 * np.pi) if return_sol: return q_factor, poloidal_trace.sol else: return q_factor
[docs] def gradshaf_star(self, r_norm, z_norm, component=None): """ Evaluate the Laplace star operator of the Grad-Shafranov equation. One can specify to evaluate the radial (component="R"), vertical (component="Z") or the total operator (None). """ assert any([component == y for y in (None, "R", "Z")]) if (component is None) or (component=="R"): # We use the product rule in the formulation to be able to utilize # the derivative from the interpolator dpsi = self.psi_interpolator(x=r_norm, y=z_norm, \ dx=1, dy=0, grid=False) d2psi = self.psi_interpolator(x=r_norm, y=z_norm, \ dx=2, dy=0, grid=False) res_R = d2psi - dpsi / r_norm if (component=="R"): return res_R res_Z = self.psi_interpolator(x=r_norm, y=z_norm, \ dx=0, dy=2, grid=False) if (component=="Z"): return res_Z return res_R + res_Z