Source code for torx.performance.dask_config_m
"""Contains functionality to configure dask for efficient parallelization."""
import dask
from dask import config
from pathlib import Path
from torx.autodoc_decorators_m import autodoc_function
spill_directory = Path("./.spill/")
[docs]
@autodoc_function
def set_spill_directory(path: Path):
"""
Set the directory where dask will temporarily spill to disk.
This is used if available memory is becoming a bottleneck.
"""
global spill_directory
spill_directory = Path(str(path))
config.set(temporary_directory=spill_directory)
[docs]
@autodoc_function
def set_dask_defaults():
"""
Set useful default values for important dask parameters.
This is based on experience. These values might need adjustments from case
to case, the implementation here should provide a guideline what and how
to set these.
"""
config.set({'interface': 'lo'})
config.set({"distributed.comm.timeouts.tcp": "60s"})
config.set({"distributed.worker.memory.target": 0.5})
config.set({"distributed.worker.memory.spill": 0.5})
config.set({"distributed.worker.memory.pause": 0.9})
config.set({"distributed.worker.memory.terminate": 0.99})
config.set({"distributed.scheduler.worker-saturation": 1})
config.set({"distributed.scheduler.worker-ttl": "15 minutes"})
config.set({"array.slicing.split_large_chunks": False})
config.set({"dataframe.shuffle.method": "disk"})
config.set(temporary_directory=spill_directory)