"""
Contains functions that handle the data storage infrastructure.
This means directory structure, file names and where data is stored.
"""
import xarray as xr
from pathlib import Path
import warnings
from torx import serialize_norm_attrs, deserialize_norm_attrs
from torx.performance import get_user, auto_chunk
from .hierarchy import get_active_hierarchy, get_modifier_hierarchy, \
create_hierarchy_from_storage
from .logger_m import tic, toc
from .file_io_m import write_dataset, read_dataset, set_lock_file
storage_partition = Path("/ptmp/")
run_name = "."
use_custom_storage_path = False
custom_storage_path = None
integrity_table = None
[docs]
def set_storage_partition(path: Path):
"""
Set the partition of the centralized storage path. Default is /ptmp/.
Resets the option use_custom_storage_path to False
"""
global storage_partition, use_custom_storage_path
use_custom_storage_path = False
storage_partition = Path(path)
# Set lock file directory to storx path (expanded path without run name)
set_lock_file(expand_storage_path().parent)
[docs]
def set_run_name(rname: str):
"""Set the name of the run that is stored or loaded."""
global run_name
run_name = rname
[docs]
def set_custom_storage_path(path: Path):
"""
Set the option to use a custom storage path given by the path provided.
The path will be taken directly, without expanding the user or run name.
"""
global use_custom_storage_path, custom_storage_path
use_custom_storage_path = True
custom_storage_path = Path(path)
def expand_storage_path():
"""Return the basepath of the centralized storage."""
if not use_custom_storage_path:
return storage_partition / get_user() / "storx" / run_name
else:
if custom_storage_path == None:
raise RuntimeError("Set the option to use custom storage path, " \
"but the path provided was None.")
return custom_storage_path
def check_run_name():
"""Check the run_name and prints a warning if it is on default."""
if not use_custom_storage_path and run_name == ".":
warnings.warn("No run_name was set, the dataset will be stored " \
"directly in the central storage, making the data vulnerable. " \
"Please use set_run_name to set a subdirectory.",
RuntimeWarning)
def check_entries(entry: list, key: str):
"""Check an entry if it is empty and raises an error if true."""
if len(entry) == 0:
raise RuntimeError(f"Could not find key {key} in hierarchy.")
if len(entry) > 1:
raise RuntimeError(f"Multiple entries found for {key} in hierarchy.")
return entry[0]
def create_hierarchical_directory(storage_string: str):
"""
Create the hierarchical directory structure from a storage string.
The storage string contains the hierarchy where each level
is separated by a "/". The last element in the storage string should name
the file that should be created.
Does no os operations if the directory exists.
Returns the full path of the file that should be written.
"""
path = expand_storage_path().absolute()
path = path / storage_string
file_name = path.stem
dir_name = path.parent
dir_name.mkdir(parents=True, exist_ok=True)
return path
def build_storage_string(key: str, attrs: dict):
"""
Create the storage string from a dataset.
The key in the dataset will determine the filename. The attributes will
determine the correct location in the storage. The level will be calculated
from the entries of the key in the active hierarchy and the attributes in
the modifier hierarchy.
Assumes that all attributes of the dataset are present in the modifier
hierarchy. Additional attributes are not accepted and will raise an error.
"""
ha = get_active_hierarchy()
entry = ha.search_for_key(key)
entry = check_entries(entry, key)
# Initialize the total level with the level of the variable, which is split
# from a string "level_x/../../.."
total_level = int(entry.split("/")[0].split("_")[1])
hm = get_modifier_hierarchy()
# The modifier list will contain strings that represent the attributes of
# the dataset with their values. For example ["spec-ions", ..]
modif_list = []
for att in attrs:
modif_entry = hm.search_for_key(att)
# Check for empty before running the check function to print out
# a custom error message in that case
if modif_entry == []:
raise RuntimeError(f"Attribute {att} in dataset is not supported " \
"by the storage. Please remove that attribute from the " \
"dataset or add it to the modifier hierarchy.")
modif_entry = check_entries(modif_entry, att)
# Add to total level by getting the level from the modifier
total_level += int(modif_entry.split("/")[0].split("_")[1])
# Extend the modifier list by the correct string representation
modif_list.extend([f"{att}-{attrs[att]}"])
# Sort modif list with order given in modifier hierarchy
sort_order = [key.split("/")[1] for key in hm.get_all_keys()]
sort_key = lambda string: next((i for i, ordered_element \
in enumerate(sort_order) \
if ordered_element in string), None)
modif_list.sort(key=sort_key)
# Construct the total storage string by first creating a list of all
# elements (level, modifiers, entry) and then combining them with the
# correct separator
string = [f"level_{total_level}"] + modif_list + [entry.split("/")[-1]]
return "/".join(string)
def strip_norm_in_dataset(ds: xr.Dataset):
"""
Strip-off the norm attribute in the Dataset if it is there.
These should only be present in the DataArrays within the Dataset.
"""
if("norm" in ds.attrs):
del ds.attrs["norm"]
warnings.warn("Stripped-off norm attribute in Dataset. Norm should "
"only be present in the DataArray within. Make sure "
"a norm object exists there.",
RuntimeWarning)
return ds
[docs]
def store_dataset(ds: xr.Dataset, return_ds: bool=False, append=False):
"""
Write an xarray dataset to file in the correct location of the storage.
Creates the directory structure if not present. Automatically
chunks the dataset and returns the updated result. Attributes within the
dataset will be used to create the correct storage location, they must be
present in the modifier hierarchy, otherwise an error will be raised.
If return_ds specified, returns the re-chunked dataset.
If append is given and it is True, appends to the time dimension in the
existing storage.
"""
global integrity_table
keys = list(ds.keys())
if len(keys) > 1:
raise NotImplementedError(r"The dataset contains multiple variables, " \
"this feature has not been implemented yet. Please use datasets " \
"with single variables only.")
check_run_name()
ds = strip_norm_in_dataset(ds)
ds = serialize_norm_attrs(ds)
storage_string = build_storage_string(keys[0], ds.attrs)
path = create_hierarchical_directory(storage_string)
tic(keys[0])
ds = auto_chunk(ds)
write_dataset(ds, path, append)
if integrity_table != None:
integrity_table[storage_string] = True
toc()
if return_ds: return ds
[docs]
def load_dataset(key: str, attrs: dict=[], storage_string: str=None):
"""
Load an xarray dataset from the correct location in the storage.
The dataset must be specified by a key for the name of the
variable. Additionally a dict of attributes is required if the variable
was modified by operations specified in the modifier hierarchy.
Alternatively to the attribute dict, one may specify the storage string
obtained by searching through the storage first.
"""
if not isinstance(key, str) and len(key) > 1:
raise NotImplementedError(r"The dataset contains multiple variables, " \
"this feature has not been implemented yet. Please use datasets " \
"with single variables only.")
if storage_string == None:
storage_string = build_storage_string(key, attrs)
path = create_hierarchical_directory(storage_string)
tic(key, otype="load")
ds = read_dataset(path)
ds = deserialize_norm_attrs(ds)
ds = ds.assign_attrs(attrs)
ds[key] = ds[key].assign_attrs(attrs)
ds = auto_chunk(ds)
toc()
return ds
[docs]
def entry_exists(key: str, attrs: dict=[], storage_string: str=None):
"""
Return true if an entry exists in the storage.
Takes the key and attribute dict or alternatively key and storage string.
"""
if storage_string == None:
try:
storage_string = build_storage_string(key, attrs)
except:
return False
hs = get_storage_metadata()
found_entries = hs.search_for(storage_string)
if len(found_entries) > 0:
if(any(storage_string == fe for fe in found_entries)):
return True
return False
[docs]
def valid_entry_exists(key: str, attrs: dict=[], storage_string: str=None):
"""
Return true if a valid entry exists in the storage.
Takes the key and attribute dict or alternatively key and storage string.
"""
exists = entry_exists(key, attrs, storage_string)
if not exists: return False
if integrity_table == None:
check_storage_integrity()
if storage_string == None:
storage_string = build_storage_string(key, attrs)
valid = integrity_table[storage_string]
return (exists and valid)
[docs]
def check_storage_integrity():
"""
Perform an integrity check of the entire storage.
Stores the result in an internal integrity table for later use.
"""
global integrity_table
keys = get_storage_metadata().get_all_keys()
tic("storage", otype="integrity check")
# NOTE: We convert warnings to errors temporarily so we can catch them
# with try except
warnings.simplefilter("error")
integrity_table = {}
warning_keys = []
failed_keys = []
for key in keys:
try:
path = create_hierarchical_directory(key)
# Perform check by executing read with perform_check option
read_dataset(path, perform_check=True)
valid = True
except(UserWarning):
warning_keys.append(key)
valid = True
except:
failed_keys.append(key)
valid = False
integrity_table[key] = valid
warnings.simplefilter("default")
toc()
if len(warning_keys) > 0:
keys_to_print = "\n".join(warning_keys)
warnings.warn("Some entries in the storage contain no checksum " \
"attribute which means they were stored with an older " \
"version of the library. These were treated as ok. " \
f"Keys with warnings: {keys_to_print}",
UserWarning)
if len(failed_keys) > 0:
keys_to_print = "\n".join(failed_keys)
warnings.warn("Some entries in the storage are corrupt. " \
f"Invalid keys: {keys_to_print}",
UserWarning)
return integrity_table