Source code for torx.specializations.grillix.snaps.load_diags_m

"""Routines to convert grillix diagnostics-files into xarray.Datasets."""
import os
import dask
import xarray as xr
from glob import glob
from pathlib import Path
from typing import Union
from numpy import ndarray

import torx
from .find_timestamps_m import find_timestamps
from torx.autodoc_decorators_m import autodoc_function
from torx.performance import auto_chunk

[docs] @autodoc_function def load_single_diag( root_filepath: Path, diag_type: str, t: int, map_to_rho: bool = True, concat_d: bool = True, ) -> xr.Dataset: """Load a single snapshot at timepoint t with neutrals if available.""" # Concatenate field diagnostics along additional tau-index if diag_type == "field": if concat_d: print("Concatenating along d!") wildcard_dir = str( root_filepath / f"diagnostics_{diag_type}" / f"fields_d???_t{int(t):05d}.nc" ) diag_files = sorted(glob(wildcard_dir)) diag = xr.concat( [xr.open_dataset(f, chunks="auto") for f in diag_files], dim = "snaps" ) else: diag = xr.open_dataset( root_filepath / f"diagnostics_{diag_type}" / f"fields_d001_t{int(t):05d}.nc", chunks="auto" ) diag = diag.rename( {"snaps": "tau", "planes": "phi"} ).assign_coords( {"tau": diag.tau.values} ) # Load all other diagnostics from single NetCDF-file else: diag_file = root_filepath.joinpath( f"diagnostics_{diag_type}", f"diags_{diag_type}_t{int(t):05d}.nc" ) assert diag_file.exists(), f"{diag_file} does not exist!" diag = xr.open_dataset(diag_file, chunks="auto") diag = diag.rename( {"n_diag": "tau"} ).assign_coords( {"tau": diag.tau.isel(dim_scalar=0).values} ) # In the case of zonal diagnostics: Map from dim_zonal to flux surface label rho if diag_type == "zonal" and map_to_rho: diag = map_diag_to_rho(root_filepath, diag) if("npoints_max" in diag.dims): diag = diag.rename({"npoints_max": "points"}) return auto_chunk(diag)
def map_diag_to_rho(root_filepath: Path, diag: xr.Dataset): """Assign flux surface label from trunk/polars.nc to zonal diagnostics.""" polars_file = root_filepath.joinpath("trunk/polars.nc") assert polars_file.exists(), \ f"No 'polars.nc' in {root_filepath}/trunk. Set map_to_rho=False" polars = torx.specializations.grillix.Polars(polars_file) assert len(diag.dim_zonal) == len(polars.rho), \ f"{root_filepath.parts[-1]} and {polars_file.parts[-1]} don't match in rho -> map_to_rho=False" return diag.rename( {"dim_zonal": "rho"} ).assign_coords( {"rho": polars.rho.values} )
[docs] @autodoc_function def load_diags_from_list( root_filepath: Path, diag_type: str, t_list: Union[list, ndarray], map_to_rho: bool = True, concat_d: bool = True, ) -> xr.Dataset: """ Return an xarray dataset with data corresponding to the diagnostics listed. Example ------- If you provide t_list = [1, 4, 7], the function will return an xr.Dataset with the data contained in [diags_diag_type_t00001.nc, diags_diag_type_t00004.nc, diags_diag_type_t00007.nc] in root_filepath/diagnostics_diag_type. """ return xr.combine_by_coords( data_objects = [ load_single_diag(root_filepath, diag_type, t, map_to_rho, concat_d) for t in t_list ] )
[docs] @autodoc_function def load_diags_in_range( root_filepath: Path, diag_type: str, t_min: int, t_max: int, step: int = 1, map_to_rho: bool = True, concat_d: bool = True, ) -> xr.Dataset: """ Load diagnostics in the range [start_snap, end_snap] (including end-points). You may provide an integer 'step' >= 1, to reduce the number of time-points returned. """ wildcard = f"fields_d001_t*" if diag_type == "field" else f"diags_{diag_type}_t*" t_list_avail = find_timestamps( directory = root_filepath.joinpath(f"diagnostics_{diag_type}"), wildcard = wildcard ) diag_path = root_filepath / f'diagnostics_{diag_type}' assert t_list_avail != [], \ f"No {diag_type} diagnostics found in {diag_path}" t_list = [ t for t in t_list_avail if (t_min <= t <= t_max) ] assert t_list != [], f"{diag_type} diagnostics not available for range [{t_min, t_max}]" return load_diags_from_list(root_filepath, diag_type, t_list[::step], map_to_rho, concat_d)
[docs] @autodoc_function def load_diags( root_filepath: Path, diag_type: str, length: int = -1, step: int = 1, map_to_rho: bool = True, concat_d: bool = True ) -> xr.Dataset: """ Load all grillix diagnostics of 'diag_type'. With the 'length' parameter the last 'length' diagnostics snapshots are loaded. """ wildcard = f"fields_d001_t*" if diag_type == "field" else f"diags_{diag_type}_t*" t_list_avail = find_timestamps( directory = root_filepath.joinpath(f"diagnostics_{diag_type}"), wildcard = wildcard ) diag_path = root_filepath / f'diagnostics_{diag_type}' assert t_list_avail != [], \ f"No {diag_type} diagnostics found in {diag_path}" if length > -1: t_list = t_list_avail[-length::step] else: print(f"Loading all {len(t_list_avail)} available {diag_type} diags of", f"{root_filepath} with stride {step}.") t_list = t_list_avail[::step] return load_diags_from_list( root_filepath, diag_type, t_list, map_to_rho=map_to_rho, concat_d=concat_d )