Source code for torx.performance.dask_client_m

"""
Contains functionality to start a dask/distributed client.

Allows to control the amount of resources used by dask.
"""
import dask
from dask import config
from pathlib import Path
from distributed import LocalCluster, Client

from torx.autodoc_decorators_m import autodoc_function
from .networking_m import get_next_free_port
from .system_information_m import get_mem_avail, get_num_cores_phys
from .byte_tools_m import convert_to_bytes
from .scheduler_plugin_m import TorxSchedulerPlugin

[docs] @autodoc_function def open_new_client(mem : str=None, cores : int=None): """ Open a new dask client. Creates a new client on the next available port using a scheduler plugin to print task information while computing. Memory and number of cores (threads) can be specified. Defaults to 1/4 of sys memory and 1/16 of sys cores. Save the client for future reference in the script. You can call restart or shutdown to control its state. Further you can gain debug information with get_scheduler_logs, get_worker_logs """ open_port = get_next_free_port() if mem is None: mem_to_use = int(get_mem_avail() // 4) mem_to_use = str(mem_to_use / 1e9) + " GB" else: mem_numeric = convert_to_bytes(mem) mem_avail = get_mem_avail() assert mem_numeric < mem_avail, \ "Memory must be smaller than system memory available (" \ + f"given: {mem_numeric}, system: {mem_avail})!" mem_to_use = mem if cores is None: num_threads = get_num_cores_phys() // 16 num_threads = max(num_threads, 1) else: cores_avail = get_num_cores_phys() assert cores <= cores_avail, \ "Number of threads must not be larger than number of " \ + f"available cores (given: {cores}, system: {cores_avail})!" num_threads = cores cluster = LocalCluster(n_workers=1, threads_per_worker=num_threads, memory_limit=mem_to_use, processes=True, scheduler_port=open_port) client = Client(cluster) plugin = TorxSchedulerPlugin() client.register_plugin(plugin) return client