Source code for torx.analysis.fourier.frequency_analysis_m

"""Routines for performing frequency analysis on data."""
import xarray as xr
import numpy as np
from pint import Unit
from typing import Union
from warnings import warn

from torx.units_handling_m import Quantity, check_units
from torx.normalization import Normalization
from torx.autodoc_decorators_m import autodoc_function

from .fourier_base_m import fourier_frequencies, fourier_transform_1d

[docs] @autodoc_function def frequency_analysis( snaps: xr.DataArray, norm: Union[Quantity, Normalization]=None, apply_window: bool=False, freq_unit: str="kHz", ): """ Perform a fast Fourier transformation in the tau dimension. The required time normalization can be provided in three ways: 1) As a norm attribute in the tau dimension of the input 'snaps' 2) Via the 'norm' kwarg of type torx.Quantity with dimension time 3) Via the 'norm' kwarg of type torx.Normalization containing 'tau_0' """ _check_constant_time_sampling(snaps) # Retrieve Fourier frequencies (in physical units) frequencies = fourier_frequencies(snaps.tau.values, len(snaps.tau)) if norm is None: try: norm = snaps.tau.norm except AttributeError: warn("No normalization found in snaps.tau.attrs and none provided "\ "via 'norm' parameter, returning unnormalized frequencies!") if norm is not None: if isinstance(norm, Normalization): frequencies *= (2 * np.pi / norm.tau_0).to(freq_unit) elif isinstance(norm, Quantity): check_units(norm, {"[time]":1}, "norm") frequencies *= (2 * np.pi / norm).to(freq_unit) else: warn("Normalization for tau must be of type, 'torx.Quantity' or "\ "'torx.Normalization', returning unnormalized frequencies!") # Dictionaries for apply_ufunc kwargs_dict = dict( x_vals=snaps.tau.values, n_samples=len(snaps.tau), apply_window=apply_window, remove_mean=False, normalize=True, ) dask_gufunc_kwargs_dict = dict( output_sizes=dict(frequency=len(snaps.tau) // 2), allow_rechunk=True, ) coords_dict = dict(frequency=frequencies) # Parallel ufunc over 1D-Fourier transform fourier_ds = xr.apply_ufunc( fourier_transform_1d, snaps.chunk({"tau": -1}), input_core_dims=[["tau"],], output_core_dims=[["frequency"]], kwargs=kwargs_dict, vectorize=True, dask="parallelized", dask_gufunc_kwargs=dask_gufunc_kwargs_dict, output_dtypes=[np.cdouble] ).assign_coords(coords_dict) if norm is not None: fourier_ds.frequency.attrs["unit"] = Unit(freq_unit) return fourier_ds
[docs] @autodoc_function def chunked_frequency_analysis( snaps: xr.DataArray, batch_size: int, norm: Union[Quantity, Normalization]=None, apply_window: bool=False, freq_unit: str="kHz", ): """ Perform frequency analysis on chunked time intervals (batches). Wrapper around the standard 'frequency_analysis'. The routine chunks the data into several batches of 'batch_size' along tau and then performs the 'frequency_analysis' for each batch. The returned DataArray hence has an additional dimension 'batch'. The idea is that by decreasing the resolution along 'tau' per Fourier transform the statistics per frequency bin may be increased with the same amount of real space data, provided that the number of samples per batch does not become too small. Note ---- The size of the tau-dimension must be exactly divisible by the 'batch_size' in order for the resulting Fourier amplitudes to be aligned in the frequency dimension. """ _check_constant_time_sampling(snaps) assert(len(snaps.tau) % batch_size == 0), \ "Size of tau dimension must be divisible by the 'batch_size'!" n_batches = len(snaps.tau) // batch_size tau_batches = np.split(snaps.tau.values, n_batches) snaps_batched = snaps.sel(tau=tau_batches[0]).expand_dims("batch") for tau_batch in tau_batches[1:]: new_batch = snaps.sel(tau=tau_batch).expand_dims("batch") snaps_batched = xr.concat( [snaps_batched, new_batch], dim="batch", join="override" ) return frequency_analysis( snaps_batched, norm=norm, apply_window=apply_window, freq_unit=freq_unit )
def _check_constant_time_sampling(snaps) -> None: """Check if snaps are equidistantly sampled along tau.""" dtau = float(snaps.tau[1] - snaps.tau[0]) for i in range(len(snaps.tau) - 1): assert np.isclose(snaps.tau[i + 1] - snaps.tau[i], dtau), \ f"Unequally sampled data at tau[{i}]={snaps.tau[i].values} and " \ f"tau[{i + 1}]={snaps.tau[i + 1].values}" dtau = snaps.tau[i + 1] - snaps.tau[i] return