"""Routines for performing frequency analysis on data."""
import xarray as xr
import numpy as np
from pint import Unit
from typing import Union
from warnings import warn
from torx.units_handling_m import Quantity, check_units
from torx.normalization import Normalization
from torx.autodoc_decorators_m import autodoc_function
from .fourier_base_m import fourier_frequencies, fourier_transform_1d
[docs]
@autodoc_function
def frequency_analysis(
snaps: xr.DataArray,
norm: Union[Quantity, Normalization]=None,
apply_window: bool=False,
freq_unit: str="kHz",
):
"""
Perform a fast Fourier transformation in the tau dimension.
The required time normalization can be provided in three ways:
1) As a norm attribute in the tau dimension of the input 'snaps'
2) Via the 'norm' kwarg of type torx.Quantity with dimension time
3) Via the 'norm' kwarg of type torx.Normalization containing 'tau_0'
"""
_check_constant_time_sampling(snaps)
# Retrieve Fourier frequencies (in physical units)
frequencies = fourier_frequencies(snaps.tau.values, len(snaps.tau))
if norm is None:
try:
norm = snaps.tau.norm
except AttributeError:
warn("No normalization found in snaps.tau.attrs and none provided "\
"via 'norm' parameter, returning unnormalized frequencies!")
if norm is not None:
if isinstance(norm, Normalization):
frequencies *= (2 * np.pi / norm.tau_0).to(freq_unit)
elif isinstance(norm, Quantity):
check_units(norm, {"[time]":1}, "norm")
frequencies *= (2 * np.pi / norm).to(freq_unit)
else:
warn("Normalization for tau must be of type, 'torx.Quantity' or "\
"'torx.Normalization', returning unnormalized frequencies!")
# Dictionaries for apply_ufunc
kwargs_dict = dict(
x_vals=snaps.tau.values,
n_samples=len(snaps.tau),
apply_window=apply_window,
remove_mean=False,
normalize=True,
)
dask_gufunc_kwargs_dict = dict(
output_sizes=dict(frequency=len(snaps.tau) // 2),
allow_rechunk=True,
)
coords_dict = dict(frequency=frequencies)
# Parallel ufunc over 1D-Fourier transform
fourier_ds = xr.apply_ufunc(
fourier_transform_1d,
snaps.chunk({"tau": -1}),
input_core_dims=[["tau"],],
output_core_dims=[["frequency"]],
kwargs=kwargs_dict,
vectorize=True,
dask="parallelized",
dask_gufunc_kwargs=dask_gufunc_kwargs_dict,
output_dtypes=[np.cdouble]
).assign_coords(coords_dict)
if norm is not None:
fourier_ds.frequency.attrs["unit"] = Unit(freq_unit)
return fourier_ds
[docs]
@autodoc_function
def chunked_frequency_analysis(
snaps: xr.DataArray,
batch_size: int,
norm: Union[Quantity, Normalization]=None,
apply_window: bool=False,
freq_unit: str="kHz",
):
"""
Perform frequency analysis on chunked time intervals (batches).
Wrapper around the standard 'frequency_analysis'. The routine chunks the
data into several batches of 'batch_size' along tau and then performs the
'frequency_analysis' for each batch. The returned DataArray hence has
an additional dimension 'batch'. The idea is that by decreasing the
resolution along 'tau' per Fourier transform the statistics per frequency
bin may be increased with the same amount of real space data, provided
that the number of samples per batch does not become too small.
Note
----
The size of the tau-dimension must be exactly divisible by the
'batch_size' in order for the resulting Fourier amplitudes to be
aligned in the frequency dimension.
"""
_check_constant_time_sampling(snaps)
assert(len(snaps.tau) % batch_size == 0), \
"Size of tau dimension must be divisible by the 'batch_size'!"
n_batches = len(snaps.tau) // batch_size
tau_batches = np.split(snaps.tau.values, n_batches)
snaps_batched = snaps.sel(tau=tau_batches[0]).expand_dims("batch")
for tau_batch in tau_batches[1:]:
new_batch = snaps.sel(tau=tau_batch).expand_dims("batch")
snaps_batched = xr.concat(
[snaps_batched, new_batch], dim="batch", join="override"
)
return frequency_analysis(
snaps_batched, norm=norm, apply_window=apply_window, freq_unit=freq_unit
)
def _check_constant_time_sampling(snaps) -> None:
"""Check if snaps are equidistantly sampled along tau."""
dtau = float(snaps.tau[1] - snaps.tau[0])
for i in range(len(snaps.tau) - 1):
assert np.isclose(snaps.tau[i + 1] - snaps.tau[i], dtau), \
f"Unequally sampled data at tau[{i}]={snaps.tau[i].values} and " \
f"tau[{i + 1}]={snaps.tau[i + 1].values}"
dtau = snaps.tau[i + 1] - snaps.tau[i]
return