Source code for torx.specializations.grillix.snaps.load_snaps_m

"""Routines to convert grillix snap-files into an xarray dataset."""
import os
import dask
import xarray as xr
from glob import glob
from pathlib import Path
from typing import Union
from numpy import ndarray

from .find_timestamps_m import find_timestamps
from torx.autodoc_decorators_m import autodoc_function
from torx.performance import auto_chunk

# Variable name in snaps = (variable name in torx, on staggered grid or not)
variable_dict = dict(
    ne=("density", False),
    te=("electron_temp", False),
    ti=("ion_temp", False),
    pot=("potential", False),
    vort=("vorticity", False),
    jpar=("current", True),
    upar=("velocity", True),
    apar=("apar", True),
    neutrals_dens=("neutrals_density", False),
    neutrals_parmom=("neutrals_momentum", True),
    neutrals_pressure=("neutrals_pressure", False),
    apar_shift=("apar_shift", True),
)

# Rename dimensions and variables for consistency with previous torx versions
rename_dict_brag = dict(
    ne="density",
    te="electron_temp",
    ti="ion_temp",
    pot="potential",
    vort="vorticity",
    upar="velocity",
    jpar="current",
    apar="apar",
    snaps="tau",
    planes="phi",
)

rename_dict_dims = dict(
    snaps="tau",
    planes="phi",
)

[docs] @autodoc_function def load_single_snap( root_filepath: Path, time: int, load_neutrals: bool = True, load_apar_shift: bool = False ) -> xr.Dataset: """ Load a single snapshot at given time. Loads with neutrals and apar_shift if available. """ brag_file = root_filepath.joinpath(f"snapsdir_braginskii/snaps_t{time:05d}.nc") assert brag_file.exists(), f"{brag_file} does not exist!" snap = xr.open_dataset(brag_file, chunks="auto").rename(rename_dict_brag) snap = snap.assign_coords({"tau": snap.tau.load()}) if load_neutrals: neut_dir = root_filepath.joinpath("snapsdir_neutrals") assert neut_dir.exists(), \ f"{neut_dir} does not exist, but 'load_neutrals' set to True!" neut_file = neut_dir.joinpath(f"snaps_t{time:05d}.nc") assert neut_file.exists(), f"{neut_file} does not exist!" snap_neut = xr.open_dataset(neut_file, chunks="auto").rename(rename_dict_dims) assert (snap.tau.values == snap_neut.tau.values).all(), \ f"tau at time={time} not matching in files {brag_file} and {neut_file}" snap_neut = snap_neut.assign_coords({"tau": snap_neut.tau.load()}) # Rename only available variables in neutrals snap snap_neut = snap_neut.rename( {var: variable_dict[var][0] for var in snap_neut.data_vars if var in list(variable_dict)} ) snap = xr.merge( [snap, snap_neut], compat="no_conflicts", join="outer", combine_attrs="drop_conflicts" ) if load_apar_shift: ashift_dir = root_filepath.joinpath("snapsdir_apar_shift") assert ashift_dir.exists(), \ f"{ashift_dir} does not exist, but 'load_apar_shift' set to True!" ashift_file = ashift_dir.joinpath(f"snaps_t{time:05d}.nc") assert ashift_file.exists(), f"{ashift_file} does not exist!" snap_mshift = xr.open_dataset(ashift_file, chunks="auto").rename(rename_dict_dims) assert (snap.tau.values == snap_mshift.tau.values).all(), \ f"tau at time={time} not matching in files {brag_file} and {ashift_file}" snap_mshift = snap_mshift.assign_coords({"tau": snap_mshift.tau.load()}) # Rename only available variables in neutrals snap snap_mshift = snap_mshift.rename( {var: variable_dict[var][0] for var in snap_mshift.data_vars if var in list(variable_dict)} ) snap = xr.merge( [snap, snap_mshift], compat="no_conflicts", join="outer", combine_attrs="drop_conflicts" ) if("npoints_max" in snap.dims): snap = snap.rename({"npoints_max": "points"}) return auto_chunk(snap)
[docs] @autodoc_function def load_last_snap( root_filepath: Path, load_neutrals: bool = True, load_apar_shift: bool = False, return_fail: bool = True ) -> xr.Dataset: """Return the last available snapshot.""" last_t = find_timestamps( directory = root_filepath.joinpath("snapsdir_braginskii"), wildcard = "snaps_t*" )[-1] return load_single_snap(root_filepath, last_t, load_neutrals, load_apar_shift)
[docs] @autodoc_function def load_snaps_from_list( root_filepath: Path, t_list: Union[list, ndarray], load_neutrals: bool = True, load_apar_shift: bool = False ) -> xr.Dataset: """ Return an xarray dataset with data corresponding to the snaps listed. Currently, we require that the neutral snaps and braginskii snaps are aligned (each t value in t_list must exist in both snapsdir_braginskii and snapsdir_neutrals, and it is assumed that these correspond to the same time point). Example ------- If you provide t_list = [1, 4, 7], the function will return an xr.Dataset with the data contained in [snaps_t00001.nc, snaps_t00004.nc, snaps_t00007.nc] in root_filepath/snapsdir_braginskii and root_filepath/snapsdir_neutrals """ return xr.combine_by_coords( data_objects = [ load_single_snap(root_filepath, t, load_neutrals, load_apar_shift) for t in t_list ], combine_attrs = "drop_conflicts" )
[docs] @autodoc_function def load_snaps_in_range( root_filepath: Path, start_snap: int, end_snap: int, step: int = 1, load_neutrals: bool = True, load_apar_shift: bool = False ) -> xr.Dataset: """ Load snaps in the range [start_snap, end_snap] (including end-points). Includes neutrals if available. You may provide an integer 'step' >= 1, to reduce the number of time-points returned. """ t_list_avail = find_timestamps( directory = root_filepath.joinpath("snapsdir_braginskii"), wildcard = "snaps_t*" ) t_list = [ t for t in t_list_avail if (start_snap <= t <= end_snap) ] return load_snaps_from_list(root_filepath, t_list[::step], load_neutrals, load_apar_shift)
[docs] @autodoc_function def load_snaps( root_filepath: Path, length: int = 1, step: int = 1, load_neutrals: bool = True, load_apar_shift: bool = False, load_all: bool = False ) -> xr.Dataset: """ Load the last 'length' snaps, including neutrals if available. Note ---- If load_all=True the 'length' keyword parameter is ignored and all available snaps are loaded with 'step' as stride. This may take long and might not be what you intended! """ t_list_avail = find_timestamps( directory = root_filepath.joinpath("snapsdir_braginskii"), wildcard = "snaps_t*" ) if length > len(t_list_avail) or load_all: print(f"Loading all {len(t_list_avail)} available snaps of", f"{root_filepath} with stride {step}.") t_list = t_list_avail[::step] else: t_list = t_list_avail[-length::step] return load_snaps_from_list(root_filepath, t_list, load_neutrals, load_apar_shift)