Source code for torx.analysis.magnetic_field_tracer_m

"""Classes for tracing magnetic field lines."""
import numpy as np
from typing import Union, List, Callable
from scipy.integrate import solve_ivp
from scipy.integrate._ivp.ivp import OdeResult
from scipy.interpolate import InterpolatedUnivariateSpline

from .vector_field_tracer_m import VectorFieldTracer
from torx.autodoc_decorators_m import autodoc_class

[docs] @autodoc_class class MagneticFieldTracer(VectorFieldTracer): """ Allows to trace along a magnetic fieldline. Toroidal angle is denoted as phi. Poloidal angle as theta. """
[docs] def __init__(self, equi, max_flux_surface_wander: float=1e-3): """Initialize the magnetic field tracer.""" # Only for NumericalEquilibrium the init of the parent class can be # called with a the discrete magnetic vector field if type(equi).__name__ == "NumericalEquilibrium": r_norm, z_norm = equi.spline_basis_r, equi.spline_basis_z vec_field = equi.magfield_vector(r_norm, z_norm) super().__init__(vec_field) self.equi = equi self.max_flux_surface_wander = max_flux_surface_wander
[docs] def toroidal_integration( self, r_initial: float, z_initial: float, phi_initial: float, phi_to_max: float, tolerance: float=1e-4, method: str="DOP853", direction: str="fwd", check_out_of_domain: bool=False, check_out_of_bounds: bool=False, events: Union[List[Callable], Callable]=None, **integrator_kwargs ) -> OdeResult: """ Perform an integration with phi as the independent variable. Returns the (x,y) positions of the trace, as well as the fieldline length. """ self.direction = direction self._check_initial_state(r_initial, z_initial, phi_initial) # Handle events and check format of the provided events if check_out_of_domain: events = self._add_event(events, self._out_of_domain) if check_out_of_bounds: events = self._add_event(events, self._out_of_bounds) solution = solve_ivp( fun=self.toroidal_integration_equation, t_span=[phi_initial, phi_initial + self._dir_fac * phi_to_max], y0=np.array([r_initial, z_initial, 0.0]), events=events, dense_output=True, rtol=tolerance, atol=tolerance, method=method, **integrator_kwargs, ) if not np.isnan(self.max_flux_surface_wander): self.check_flux_surface_wander(solution) return solution
[docs] def toroidal_integration_equation(self, phi, state): """ Equation that is required to integrate toroidally around the torus. The fieldline length is returned as the third element of the state vector. """ r_norm = state[0] z_norm = state[1] b_r = self.equi.magfield_component_r(r_norm, z_norm, phi) b_z = self.equi.magfield_component_z(r_norm, z_norm, phi) b_t = self.equi.magfield_component_toroidal(r_norm, z_norm, phi) jacobian = self.equi.jacobian(r_norm, z_norm, phi) d_state = np.zeros_like(state) d_state[0] = b_r / b_t * jacobian d_state[1] = b_z / b_t * jacobian d_state[2] = ( np.sqrt(b_r * b_r / (b_t * b_t) + b_z * b_z / (b_t * b_t) + 1.0) * jacobian ) return d_state
[docs] def poloidal_integration( self, r_initial: float, z_initial: float, theta_max: float, tolerance: float=1e-4, method: str="DOP853", check_out_of_domain: bool=False, check_out_of_bounds: bool=False, events: Union[List[Callable], Callable]=None, **integrator_kwargs, ) -> OdeResult: """ Perform an integration with theta as the independent variable. Valid only for the confined region. Returns the (x,y) positions of the trace, as well as the fieldline length. """ self._check_initial_state(r_initial, z_initial, 0.0) # Handle events and check format of the provided events if check_out_of_domain: events = self._add_event(events, self._out_of_domain) if check_out_of_bounds: events = self._add_event(events, self._out_of_bounds) # Make sure we are in the closed field line region rho_initial = self.equi.normalized_flux_surface_label(r_initial, z_initial).values if rho_initial >= 1.0: raise RuntimeError( "Cannot perform poloidal integration for open fieldlines." \ +f"x={r_initial}, y= {z_initial}, rho={rho_initial}" ) solution = solve_ivp( fun=self.poloidal_integration_equation, t_span=[0.0, theta_max], y0=np.array([r_initial, z_initial, 0.0]), events=events, dense_output=True, rtol=tolerance, atol=tolerance, method=method, **integrator_kwargs, ) self.check_flux_surface_wander(solution) return solution
[docs] def poloidal_integration_equation(self, _, state): """ Equation that is required to integrate poloidally around the torus. The fieldline length is returned as the third element of the state vector. """ r_norm = state[0] z_norm = state[1] b_r = self.equi.magfield_component_r(r_norm, z_norm) b_z = self.equi.magfield_component_z(r_norm, z_norm) theta = self.theta(r_norm, z_norm) radius = self.minor_radius(r_norm, z_norm) # Magnetic field component along the integration contour (theta) b_along_fs = -b_r * np.sin(theta) + b_z * np.cos(theta) # First check magnetic field obtained b_pol = np.sqrt(b_r**2 + b_z**2) # Magnetic field component across the integration contour (radial) b_across_fs = b_r * np.cos(theta) + b_z * np.sin(theta) b_pol_check = np.sqrt(b_across_fs**2 + b_along_fs**2) assert np.isclose(b_pol_check / b_pol, 1.0 ), f"Poloidal field on curve was {b_pol_check}, should be {b_pol}!" # After check passed, calculate increments d_state = np.zeros_like(state) # The increments dR and dZ are given by simply integrating the # equations for the magnetic field line (4.1.6) in [1] on # the poloidal curve. # [1] D'haeseleer, Flux coordinates and magnetic field structure # NOTE: The poloidal line element is r*dtheta. d_state[0] = radius * b_r / np.abs(b_along_fs) d_state[1] = radius * b_z / np.abs(b_along_fs) # The increment in symmetry angle is given by eq. (24) in [2]. # [2] Ribeiro, Conformal Tokamak Geometry for Turbulence Computations # NOTE: Here we return the product q*dtheta_s. Since the contravariant # component of the poloidal field is used, a factor 1/r enters the # denominator. # NOTE: The current I used in the formula is I = B_tor * R = B_tor_axis # since we set B_tor = B_tor_axis / R per definition. We use the # call to the toroidal field with axis coordinates since this # returns the field with the same normalization as used above. current = self.equi.magfield_component_toroidal(self.equi.axis_r_norm, self.equi.axis_z_norm) d_state[2] = current * np.abs(radius / b_along_fs / r_norm**2) return d_state
[docs] def check_flux_surface_wander(self, solution): """ Check the maximum deviation from the flux-surface during trace. Throws an error if it exceeds the threshold. """ rho_initial = self.equi.normalized_flux_surface_label( solution.y[0][0], solution.y[1][0] ).values dense_output = solution.sol( np.linspace(np.min(solution.t), np.max(solution.t), 1000) ) rho_trace = self.equi.normalized_flux_surface_label( dense_output[0], dense_output[1], ).values flux_surface_wander = np.abs(rho_trace - rho_initial) assert ( np.max(flux_surface_wander) < self.max_flux_surface_wander ), ( f"Flux surface wander exceeded threshold \ ({np.max(flux_surface_wander):3.2e} > {self.max_flux_surface_wander:3.2e}) \ Try decreasing the tolerance of the integrator, or increasing the \ max_flux_surface_wander" )
[docs] def find_neighboring_points( self, r_initial, z_initial, n_toroidal_planes: int=16 ): """ Find the neighbors in both directions of an array of sample points. Although this loop should be trivially parallel, dask parallelism doesn't seem to work here. """ r_initial = np.atleast_1d(r_initial) z_initial = np.atleast_1d(z_initial) assert r_initial.shape == z_initial.shape assert ( r_initial.ndim == 1 and z_initial.ndim == 1 ), f"Should provide r and z as 1D arrays" forward_trace = np.zeros((r_initial.size, 3)) reverse_trace = np.zeros((r_initial.size, 3)) print("Tracing", end=" ") for i in range(r_initial.size): print(f"{i}/{r_initial.size}", end=", ") r_in, z_in = r_initial[i], z_initial[i] forward_trace[i, :] = self.toroidal_integration( r_in, z_in, +2.0 * np.pi / n_toroidal_planes, ).y[:, -1] reverse_trace[i, :] = self.toroidal_integration( r_in, z_in, -2.0 * np.pi / n_toroidal_planes, ).y[:, -1] print("Done") return forward_trace, reverse_trace
[docs] def trace_reverse(self, r_initial, z_initial, n_phi_total, phi_initial = 0, n_phi=1): """ Trace field lines back for a given amount of toroidal planes. Starts at r,z initial and returns the new points r,z reverse. """ r_reverse = np.zeros(r_initial.size) z_reverse = np.zeros(r_initial.size) for i in range(r_initial.size): r_in, z_in = r_initial[i], z_initial[i] sol = self.toroidal_integration( r_in, z_in, phi_initial, (-2.0 * np.pi / n_phi_total) * n_phi, ) r_reverse[i] = sol.y[0, -1] z_reverse[i] = sol.y[1, -1] return r_reverse, z_reverse
[docs] def minor_radius(self, r_norm, z_norm): """Minor radius defined as Cartesian distance to magnetic axis.""" return np.sqrt((r_norm - self.equi.axis_r_norm.values)**2 + (z_norm - self.equi.axis_z_norm.values)**2)
[docs] def theta(self, r_norm, z_norm): """ Poloidal angle from the magnetic axis. Note ---- The zero point is the outboard midplane, and theta ranges from [-pi, pi]. """ return np.arctan2(z_norm - self.equi.axis_z_norm.values, r_norm - self.equi.axis_r_norm.values)
def _out_of_bounds(self, _, state): """ Check if integrator state is out of bounds. Return 1 if the state of the integrator is inside the boundary polygon, otherwise -1 """ r, z = state[0], state[1] boundary_polygon = self.equi.get_boundary_polygon() return 1 if boundary_polygon.point_inside(r, z) else -1 _out_of_bounds.terminal = True _out_of_bounds.direction = -1
@autodoc_class class NormalTracer(MagneticFieldTracer): """Finds a line which is continuously normal to the poloidal field.""" def toroidal_integration_equation(self, _, state): """ Equation that is required to integrate toroidally around the torus. The fieldline length is returned as the third element of the state vector. """ r_norm, z_norm = state[0], state[1] b_r, b_z, b_t, jacobian = ( self.equi.magfield_component_r(r_norm, z_norm), self.equi.magfield_component_z(r_norm, z_norm), self.equi.magfield_component_toroidal(r_norm, z_norm), self.equi.jacobian(r_norm, z_norm), ) d_state = np.zeros_like(state) d_state[0] = -b_z / b_t * jacobian d_state[1] = +b_r / b_t * jacobian d_state[2] = 0 return d_state def check_flux_surface_wander(self, solution): """ Check the maximum deviation from the flux-surface during trace. Since we're perpendicularly tracing, we necessarily will go off the flux surface. """ pass def polar_to_cart_chord(equi, rho, theta, max_radius=1.0, n_samples=1000): """Convert from polar to cartesian coordinates.""" xaxis = np.array(equi.axis_r_norm) yaxis = np.array(equi.axis_z_norm) radias = np.linspace(0, max_radius, n_samples) r_norm = radias * np.cos(theta) + xaxis z_norm = radias * np.sin(theta) + yaxis rho_values = equi.normalized_flux_surface_label(r_norm, z_norm) # NOTE: We manually set the first entry to zero due to possible very small # inaccuracies that might lead to finite rho at axis rho_values[0] = 0.0 xsol, ysol, rsol = [], [], [] for rhoval in np.atleast_1d(rho): roots = InterpolatedUnivariateSpline(radias, rho_values - rhoval).roots() if len(roots) == 0: print(f"rho value {rhoval} not found. Skipping") continue elif len(roots) > 1: raise RuntimeError( "Multiple roots found. Set max radius to ensure that rho is monotonic" ) xsol.append(roots[0] * np.cos(theta) + xaxis) ysol.append(roots[0] * np.sin(theta) + yaxis) rsol.append(rhoval) return np.array(xsol), np.array(ysol), np.array(rsol)