"""Classes for tracing magnetic field lines."""
import numpy as np
from typing import Union, List, Callable
from scipy.integrate import solve_ivp
from scipy.integrate._ivp.ivp import OdeResult
from scipy.interpolate import InterpolatedUnivariateSpline
from .vector_field_tracer_m import VectorFieldTracer
from torx.autodoc_decorators_m import autodoc_class
[docs]
@autodoc_class
class MagneticFieldTracer(VectorFieldTracer):
"""
Allows to trace along a magnetic fieldline.
Toroidal angle is denoted as phi. Poloidal angle as theta.
"""
[docs]
def __init__(self, equi, max_flux_surface_wander: float=1e-3):
"""Initialize the magnetic field tracer."""
# Only for NumericalEquilibrium the init of the parent class can be
# called with a the discrete magnetic vector field
if type(equi).__name__ == "NumericalEquilibrium":
r_norm, z_norm = equi.spline_basis_r, equi.spline_basis_z
vec_field = equi.magfield_vector(r_norm, z_norm)
super().__init__(vec_field)
self.equi = equi
self.max_flux_surface_wander = max_flux_surface_wander
[docs]
def toroidal_integration(
self,
r_initial: float,
z_initial: float,
phi_initial: float,
phi_to_max: float,
tolerance: float=1e-4,
method: str="DOP853",
direction: str="fwd",
check_out_of_domain: bool=False,
check_out_of_bounds: bool=False,
events: Union[List[Callable], Callable]=None,
**integrator_kwargs
) -> OdeResult:
"""
Perform an integration with phi as the independent variable.
Returns the (x,y) positions of the trace, as well as the fieldline
length.
"""
self.direction = direction
self._check_initial_state(r_initial, z_initial, phi_initial)
# Handle events and check format of the provided events
if check_out_of_domain:
events = self._add_event(events, self._out_of_domain)
if check_out_of_bounds:
events = self._add_event(events, self._out_of_bounds)
solution = solve_ivp(
fun=self.toroidal_integration_equation,
t_span=[phi_initial, phi_initial + self._dir_fac * phi_to_max],
y0=np.array([r_initial, z_initial, 0.0]),
events=events,
dense_output=True,
rtol=tolerance,
atol=tolerance,
method=method,
**integrator_kwargs,
)
if not np.isnan(self.max_flux_surface_wander):
self.check_flux_surface_wander(solution)
return solution
[docs]
def toroidal_integration_equation(self, phi, state):
"""
Equation that is required to integrate toroidally around the torus.
The fieldline length is returned as the third element of the state
vector.
"""
r_norm = state[0]
z_norm = state[1]
b_r = self.equi.magfield_component_r(r_norm, z_norm, phi)
b_z = self.equi.magfield_component_z(r_norm, z_norm, phi)
b_t = self.equi.magfield_component_toroidal(r_norm, z_norm, phi)
jacobian = self.equi.jacobian(r_norm, z_norm, phi)
d_state = np.zeros_like(state)
d_state[0] = b_r / b_t * jacobian
d_state[1] = b_z / b_t * jacobian
d_state[2] = (
np.sqrt(b_r * b_r / (b_t * b_t) + b_z * b_z / (b_t * b_t) + 1.0) * jacobian
)
return d_state
[docs]
def poloidal_integration(
self,
r_initial: float,
z_initial: float,
theta_max: float,
tolerance: float=1e-4,
method: str="DOP853",
check_out_of_domain: bool=False,
check_out_of_bounds: bool=False,
events: Union[List[Callable], Callable]=None,
**integrator_kwargs,
) -> OdeResult:
"""
Perform an integration with theta as the independent variable.
Valid only for the confined region.
Returns the (x,y) positions of the trace, as well as the fieldline
length.
"""
self._check_initial_state(r_initial, z_initial, 0.0)
# Handle events and check format of the provided events
if check_out_of_domain:
events = self._add_event(events, self._out_of_domain)
if check_out_of_bounds:
events = self._add_event(events, self._out_of_bounds)
# Make sure we are in the closed field line region
rho_initial = self.equi.normalized_flux_surface_label(r_initial,
z_initial).values
if rho_initial >= 1.0:
raise RuntimeError(
"Cannot perform poloidal integration for open fieldlines." \
+f"x={r_initial}, y= {z_initial}, rho={rho_initial}"
)
solution = solve_ivp(
fun=self.poloidal_integration_equation,
t_span=[0.0, theta_max],
y0=np.array([r_initial, z_initial, 0.0]),
events=events,
dense_output=True,
rtol=tolerance,
atol=tolerance,
method=method,
**integrator_kwargs,
)
self.check_flux_surface_wander(solution)
return solution
[docs]
def poloidal_integration_equation(self, _, state):
"""
Equation that is required to integrate poloidally around the torus.
The fieldline length is returned as the third element of the state
vector.
"""
r_norm = state[0]
z_norm = state[1]
b_r = self.equi.magfield_component_r(r_norm, z_norm)
b_z = self.equi.magfield_component_z(r_norm, z_norm)
theta = self.theta(r_norm, z_norm)
radius = self.minor_radius(r_norm, z_norm)
# Magnetic field component along the integration contour (theta)
b_along_fs = -b_r * np.sin(theta) + b_z * np.cos(theta)
# First check magnetic field obtained
b_pol = np.sqrt(b_r**2 + b_z**2)
# Magnetic field component across the integration contour (radial)
b_across_fs = b_r * np.cos(theta) + b_z * np.sin(theta)
b_pol_check = np.sqrt(b_across_fs**2 + b_along_fs**2)
assert np.isclose(b_pol_check / b_pol, 1.0
), f"Poloidal field on curve was {b_pol_check}, should be {b_pol}!"
# After check passed, calculate increments
d_state = np.zeros_like(state)
# The increments dR and dZ are given by simply integrating the
# equations for the magnetic field line (4.1.6) in [1] on
# the poloidal curve.
# [1] D'haeseleer, Flux coordinates and magnetic field structure
# NOTE: The poloidal line element is r*dtheta.
d_state[0] = radius * b_r / np.abs(b_along_fs)
d_state[1] = radius * b_z / np.abs(b_along_fs)
# The increment in symmetry angle is given by eq. (24) in [2].
# [2] Ribeiro, Conformal Tokamak Geometry for Turbulence Computations
# NOTE: Here we return the product q*dtheta_s. Since the contravariant
# component of the poloidal field is used, a factor 1/r enters the
# denominator.
# NOTE: The current I used in the formula is I = B_tor * R = B_tor_axis
# since we set B_tor = B_tor_axis / R per definition. We use the
# call to the toroidal field with axis coordinates since this
# returns the field with the same normalization as used above.
current = self.equi.magfield_component_toroidal(self.equi.axis_r_norm,
self.equi.axis_z_norm)
d_state[2] = current * np.abs(radius / b_along_fs / r_norm**2)
return d_state
[docs]
def check_flux_surface_wander(self, solution):
"""
Check the maximum deviation from the flux-surface during trace.
Throws an error if it exceeds the threshold.
"""
rho_initial = self.equi.normalized_flux_surface_label(
solution.y[0][0], solution.y[1][0]
).values
dense_output = solution.sol(
np.linspace(np.min(solution.t), np.max(solution.t), 1000)
)
rho_trace = self.equi.normalized_flux_surface_label(
dense_output[0], dense_output[1],
).values
flux_surface_wander = np.abs(rho_trace - rho_initial)
assert (
np.max(flux_surface_wander) < self.max_flux_surface_wander
), (
f"Flux surface wander exceeded threshold \
({np.max(flux_surface_wander):3.2e} > {self.max_flux_surface_wander:3.2e}) \
Try decreasing the tolerance of the integrator, or increasing the \
max_flux_surface_wander"
)
[docs]
def find_neighboring_points(
self, r_initial, z_initial, n_toroidal_planes: int=16
):
"""
Find the neighbors in both directions of an array of sample points.
Although this loop should be trivially parallel, dask parallelism
doesn't seem to work here.
"""
r_initial = np.atleast_1d(r_initial)
z_initial = np.atleast_1d(z_initial)
assert r_initial.shape == z_initial.shape
assert (
r_initial.ndim == 1 and z_initial.ndim == 1
), f"Should provide r and z as 1D arrays"
forward_trace = np.zeros((r_initial.size, 3))
reverse_trace = np.zeros((r_initial.size, 3))
print("Tracing", end=" ")
for i in range(r_initial.size):
print(f"{i}/{r_initial.size}", end=", ")
r_in, z_in = r_initial[i], z_initial[i]
forward_trace[i, :] = self.toroidal_integration(
r_in,
z_in,
+2.0 * np.pi / n_toroidal_planes,
).y[:, -1]
reverse_trace[i, :] = self.toroidal_integration(
r_in,
z_in,
-2.0 * np.pi / n_toroidal_planes,
).y[:, -1]
print("Done")
return forward_trace, reverse_trace
[docs]
def trace_reverse(self, r_initial, z_initial, n_phi_total, phi_initial = 0,
n_phi=1):
"""
Trace field lines back for a given amount of toroidal planes.
Starts at r,z initial and returns the new points r,z reverse.
"""
r_reverse = np.zeros(r_initial.size)
z_reverse = np.zeros(r_initial.size)
for i in range(r_initial.size):
r_in, z_in = r_initial[i], z_initial[i]
sol = self.toroidal_integration(
r_in,
z_in,
phi_initial,
(-2.0 * np.pi / n_phi_total) * n_phi,
)
r_reverse[i] = sol.y[0, -1]
z_reverse[i] = sol.y[1, -1]
return r_reverse, z_reverse
[docs]
def minor_radius(self, r_norm, z_norm):
"""Minor radius defined as Cartesian distance to magnetic axis."""
return np.sqrt((r_norm - self.equi.axis_r_norm.values)**2 +
(z_norm - self.equi.axis_z_norm.values)**2)
[docs]
def theta(self, r_norm, z_norm):
"""
Poloidal angle from the magnetic axis.
Note
----
The zero point is the outboard midplane, and theta ranges
from [-pi, pi].
"""
return np.arctan2(z_norm - self.equi.axis_z_norm.values,
r_norm - self.equi.axis_r_norm.values)
def _out_of_bounds(self, _, state):
"""
Check if integrator state is out of bounds.
Return 1 if the state of the integrator is inside the boundary
polygon, otherwise -1
"""
r, z = state[0], state[1]
boundary_polygon = self.equi.get_boundary_polygon()
return 1 if boundary_polygon.point_inside(r, z) else -1
_out_of_bounds.terminal = True
_out_of_bounds.direction = -1
@autodoc_class
class NormalTracer(MagneticFieldTracer):
"""Finds a line which is continuously normal to the poloidal field."""
def toroidal_integration_equation(self, _, state):
"""
Equation that is required to integrate toroidally around the torus.
The fieldline length is returned as the third element of the state
vector.
"""
r_norm, z_norm = state[0], state[1]
b_r, b_z, b_t, jacobian = (
self.equi.magfield_component_r(r_norm, z_norm),
self.equi.magfield_component_z(r_norm, z_norm),
self.equi.magfield_component_toroidal(r_norm, z_norm),
self.equi.jacobian(r_norm, z_norm),
)
d_state = np.zeros_like(state)
d_state[0] = -b_z / b_t * jacobian
d_state[1] = +b_r / b_t * jacobian
d_state[2] = 0
return d_state
def check_flux_surface_wander(self, solution):
"""
Check the maximum deviation from the flux-surface during trace.
Since we're perpendicularly tracing, we necessarily will go off the
flux surface.
"""
pass
def polar_to_cart_chord(equi, rho, theta, max_radius=1.0, n_samples=1000):
"""Convert from polar to cartesian coordinates."""
xaxis = np.array(equi.axis_r_norm)
yaxis = np.array(equi.axis_z_norm)
radias = np.linspace(0, max_radius, n_samples)
r_norm = radias * np.cos(theta) + xaxis
z_norm = radias * np.sin(theta) + yaxis
rho_values = equi.normalized_flux_surface_label(r_norm, z_norm)
# NOTE: We manually set the first entry to zero due to possible very small
# inaccuracies that might lead to finite rho at axis
rho_values[0] = 0.0
xsol, ysol, rsol = [], [], []
for rhoval in np.atleast_1d(rho):
roots = InterpolatedUnivariateSpline(radias, rho_values - rhoval).roots()
if len(roots) == 0:
print(f"rho value {rhoval} not found. Skipping")
continue
elif len(roots) > 1:
raise RuntimeError(
"Multiple roots found. Set max radius to ensure that rho is monotonic"
)
xsol.append(roots[0] * np.cos(theta) + xaxis)
ysol.append(roots[0] * np.sin(theta) + yaxis)
rsol.append(rhoval)
return np.array(xsol), np.array(ysol), np.array(rsol)