Source code for storx.storage_m

"""
Contains functions that handle the data storage infrastructure.

This means directory structure, file names and where data is stored.
"""
import xarray as xr
from pathlib import Path
import warnings
from torx import serialize_norm_attrs, deserialize_norm_attrs
from torx.performance import get_user, auto_chunk

from .hierarchy import get_active_hierarchy, get_modifier_hierarchy, \
                       create_hierarchy_from_storage
from .logger_m import tic, toc
from .file_io_m import write_dataset, read_dataset, set_lock_file

storage_partition = Path("/ptmp/")
run_name = "."

use_custom_storage_path = False
custom_storage_path = None

integrity_table = None

[docs] def set_storage_partition(path: Path): """ Set the partition of the centralized storage path. Default is /ptmp/. Resets the option use_custom_storage_path to False """ global storage_partition, use_custom_storage_path use_custom_storage_path = False storage_partition = Path(path) # Set lock file directory to storx path (expanded path without run name) set_lock_file(expand_storage_path().parent)
[docs] def set_run_name(rname: str): """Set the name of the run that is stored or loaded.""" global run_name run_name = rname
[docs] def set_custom_storage_path(path: Path): """ Set the option to use a custom storage path given by the path provided. The path will be taken directly, without expanding the user or run name. """ global use_custom_storage_path, custom_storage_path use_custom_storage_path = True custom_storage_path = Path(path)
def expand_storage_path(): """Return the basepath of the centralized storage.""" if not use_custom_storage_path: return storage_partition / get_user() / "storx" / run_name else: if custom_storage_path == None: raise RuntimeError("Set the option to use custom storage path, " \ "but the path provided was None.") return custom_storage_path def check_run_name(): """Check the run_name and prints a warning if it is on default.""" if not use_custom_storage_path and run_name == ".": warnings.warn("No run_name was set, the dataset will be stored " \ "directly in the central storage, making the data vulnerable. " \ "Please use set_run_name to set a subdirectory.", RuntimeWarning) def check_entries(entry: list, key: str): """Check an entry if it is empty and raises an error if true.""" if len(entry) == 0: raise RuntimeError(f"Could not find key {key} in hierarchy.") if len(entry) > 1: raise RuntimeError(f"Multiple entries found for {key} in hierarchy.") return entry[0] def create_hierarchical_directory(storage_string: str): """ Create the hierarchical directory structure from a storage string. The storage string contains the hierarchy where each level is separated by a "/". The last element in the storage string should name the file that should be created. Does no os operations if the directory exists. Returns the full path of the file that should be written. """ path = expand_storage_path().absolute() path = path / storage_string file_name = path.stem dir_name = path.parent dir_name.mkdir(parents=True, exist_ok=True) return path def build_storage_string(key: str, attrs: dict): """ Create the storage string from a dataset. The key in the dataset will determine the filename. The attributes will determine the correct location in the storage. The level will be calculated from the entries of the key in the active hierarchy and the attributes in the modifier hierarchy. Assumes that all attributes of the dataset are present in the modifier hierarchy. Additional attributes are not accepted and will raise an error. """ ha = get_active_hierarchy() entry = ha.search_for_key(key) entry = check_entries(entry, key) # Initialize the total level with the level of the variable, which is split # from a string "level_x/../../.." total_level = int(entry.split("/")[0].split("_")[1]) hm = get_modifier_hierarchy() # The modifier list will contain strings that represent the attributes of # the dataset with their values. For example ["spec-ions", ..] modif_list = [] for att in attrs: modif_entry = hm.search_for_key(att) # Check for empty before running the check function to print out # a custom error message in that case if modif_entry == []: raise RuntimeError(f"Attribute {att} in dataset is not supported " \ "by the storage. Please remove that attribute from the " \ "dataset or add it to the modifier hierarchy.") modif_entry = check_entries(modif_entry, att) # Add to total level by getting the level from the modifier total_level += int(modif_entry.split("/")[0].split("_")[1]) # Extend the modifier list by the correct string representation modif_list.extend([f"{att}-{attrs[att]}"]) # Sort modif list with order given in modifier hierarchy sort_order = [key.split("/")[1] for key in hm.get_all_keys()] sort_key = lambda string: next((i for i, ordered_element \ in enumerate(sort_order) \ if ordered_element in string), None) modif_list.sort(key=sort_key) # Construct the total storage string by first creating a list of all # elements (level, modifiers, entry) and then combining them with the # correct separator string = [f"level_{total_level}"] + modif_list + [entry.split("/")[-1]] return "/".join(string) def strip_norm_in_dataset(ds: xr.Dataset): """ Strip-off the norm attribute in the Dataset if it is there. These should only be present in the DataArrays within the Dataset. """ if("norm" in ds.attrs): del ds.attrs["norm"] warnings.warn("Stripped-off norm attribute in Dataset. Norm should " "only be present in the DataArray within. Make sure " "a norm object exists there.", RuntimeWarning) return ds
[docs] def store_dataset(ds: xr.Dataset, return_ds: bool=False, append=False): """ Write an xarray dataset to file in the correct location of the storage. Creates the directory structure if not present. Automatically chunks the dataset and returns the updated result. Attributes within the dataset will be used to create the correct storage location, they must be present in the modifier hierarchy, otherwise an error will be raised. If return_ds specified, returns the re-chunked dataset. If append is given and it is True, appends to the time dimension in the existing storage. """ global integrity_table keys = list(ds.keys()) if len(keys) > 1: raise NotImplementedError(r"The dataset contains multiple variables, " \ "this feature has not been implemented yet. Please use datasets " \ "with single variables only.") check_run_name() ds = strip_norm_in_dataset(ds) ds = serialize_norm_attrs(ds) storage_string = build_storage_string(keys[0], ds.attrs) path = create_hierarchical_directory(storage_string) tic(keys[0]) ds = auto_chunk(ds) write_dataset(ds, path, append) if integrity_table != None: integrity_table[storage_string] = True toc() if return_ds: return ds
[docs] def load_dataset(key: str, attrs: dict=[], storage_string: str=None): """ Load an xarray dataset from the correct location in the storage. The dataset must be specified by a key for the name of the variable. Additionally a dict of attributes is required if the variable was modified by operations specified in the modifier hierarchy. Alternatively to the attribute dict, one may specify the storage string obtained by searching through the storage first. """ if not isinstance(key, str) and len(key) > 1: raise NotImplementedError(r"The dataset contains multiple variables, " \ "this feature has not been implemented yet. Please use datasets " \ "with single variables only.") if storage_string == None: storage_string = build_storage_string(key, attrs) path = create_hierarchical_directory(storage_string) tic(key, otype="load") ds = read_dataset(path) ds = deserialize_norm_attrs(ds) ds = ds.assign_attrs(attrs) ds[key] = ds[key].assign_attrs(attrs) ds = auto_chunk(ds) toc() return ds
[docs] def get_storage_metadata(): """Return a hierarchy object that represents the data that is stored.""" path = expand_storage_path().absolute() hs = create_hierarchy_from_storage(path) return hs
[docs] def entry_exists(key: str, attrs: dict=[], storage_string: str=None): """ Return true if an entry exists in the storage. Takes the key and attribute dict or alternatively key and storage string. """ if storage_string == None: try: storage_string = build_storage_string(key, attrs) except: return False hs = get_storage_metadata() found_entries = hs.search_for(storage_string) if len(found_entries) > 0: if(any(storage_string == fe for fe in found_entries)): return True return False
[docs] def valid_entry_exists(key: str, attrs: dict=[], storage_string: str=None): """ Return true if a valid entry exists in the storage. Takes the key and attribute dict or alternatively key and storage string. """ exists = entry_exists(key, attrs, storage_string) if not exists: return False if integrity_table == None: check_storage_integrity() if storage_string == None: storage_string = build_storage_string(key, attrs) valid = integrity_table[storage_string] return (exists and valid)
[docs] def check_storage_integrity(): """ Perform an integrity check of the entire storage. Stores the result in an internal integrity table for later use. """ global integrity_table keys = get_storage_metadata().get_all_keys() tic("storage", otype="integrity check") # NOTE: We convert warnings to errors temporarily so we can catch them # with try except warnings.simplefilter("error") integrity_table = {} warning_keys = [] failed_keys = [] for key in keys: try: path = create_hierarchical_directory(key) # Perform check by executing read with perform_check option read_dataset(path, perform_check=True) valid = True except(UserWarning): warning_keys.append(key) valid = True except: failed_keys.append(key) valid = False integrity_table[key] = valid warnings.simplefilter("default") toc() if len(warning_keys) > 0: keys_to_print = "\n".join(warning_keys) warnings.warn("Some entries in the storage contain no checksum " \ "attribute which means they were stored with an older " \ "version of the library. These were treated as ok. " \ f"Keys with warnings: {keys_to_print}", UserWarning) if len(failed_keys) > 0: keys_to_print = "\n".join(failed_keys) warnings.warn("Some entries in the storage are corrupt. " \ f"Invalid keys: {keys_to_print}", UserWarning) return integrity_table