Source code for torx.performance.dask_client_m
"""
Contains functionality to start a dask/distributed client.
Allows to control the amount of resources used by dask.
"""
import dask
from dask import config
from pathlib import Path
from distributed import LocalCluster, Client
from torx.autodoc_decorators_m import autodoc_function
from .networking_m import get_next_free_port
from .system_information_m import get_mem_avail, get_num_cores_phys
from .byte_tools_m import convert_to_bytes
from .scheduler_plugin_m import TorxSchedulerPlugin
[docs]
@autodoc_function
def open_new_client(mem : str=None, cores : int=None):
"""
Open a new dask client.
Creates a new client on the next available port using a scheduler plugin
to print task information while computing. Memory and number of cores
(threads) can be specified. Defaults to 1/4 of sys memory and 1/16 of
sys cores.
Save the client for future reference in the script. You can call restart
or shutdown to control its state. Further you can gain debug information
with get_scheduler_logs, get_worker_logs
"""
open_port = get_next_free_port()
if mem is None:
mem_to_use = int(get_mem_avail() // 4)
mem_to_use = str(mem_to_use / 1e9) + " GB"
else:
mem_numeric = convert_to_bytes(mem)
mem_avail = get_mem_avail()
assert mem_numeric < mem_avail, \
"Memory must be smaller than system memory available (" \
+ f"given: {mem_numeric}, system: {mem_avail})!"
mem_to_use = mem
if cores is None:
num_threads = get_num_cores_phys() // 16
num_threads = max(num_threads, 1)
else:
cores_avail = get_num_cores_phys()
assert cores <= cores_avail, \
"Number of threads must not be larger than number of " \
+ f"available cores (given: {cores}, system: {cores_avail})!"
num_threads = cores
cluster = LocalCluster(n_workers=1,
threads_per_worker=num_threads,
memory_limit=mem_to_use,
processes=True,
scheduler_port=open_port)
client = Client(cluster)
plugin = TorxSchedulerPlugin()
client.register_plugin(plugin)
return client