I am using Dask to setup a cluster. For now I am setting up both the scheduler and the workers on localhost.
cluster = SSHCluster(["localhost", "localhost"],
connect_options={"known_hosts": None},
worker_options={"n_workers": params["n_workers"], },
scheduler_options={"port": 0, "dashboard_address": ":8797"},)
client = Client(cluster)
Is there a way to create stateful global parameters that can be initialized on the workers' side and be used by any worker_methods that are subsequently assigned to be computed on the workers?
I have found the client.register_worker_plugin method.
def read_only_data(self, jsonfilepath):
with open(jsonfilepath, "r") as readfile:
return json.load(read_file)
def main():
cluster = SSHCluster(params) # simplified
client = Client(cluster)
plugin = read_only_data(jsonfilepath)
client.register_worker_plugin(plugin, name="read-only-data")
However, ReadOnlyData is initialized on the client-side, hence, self.persons and self.persons_len, are copied over to the workers (and not initialized on the workers' side). While this may be useful for small data, if the data set is massive, this will incur additional communication overhead to copy over (unless I am missing something conceptually).
Let's say ReadOnlyData and the file in "jsonfilepath" was available on the workers' side. We could call it from "worker_method_1" and "worker_method_2" that feature some arbitrary logic; however, this means that it would have to be called every time the workers are called. Is there some "initialization" event/method, that happens on the workers' side, right after worker creation, and before the assignment of the worker methods, that would allow us to pre-load some data structures as stateful global parameters, commonly shared among subsequent instances of the worker methods?
Update
After trying @mdurant's suggestions, with a JSON file of 280mb, the code gets stuck on the client.replicate() for over an hour. Loading the file in a single process without Dask, takes less than 20 seconds. In the dashboard, the workers are all using approx 2 GB, and the memory was increasing. There's also network activity being recorded.
All of a sudden, the script crashed with the below:
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.replicate local=tcp://192.168.0.6:38498 remote=tcp://192.168.0.6:46171>: TimeoutError: [Errno 10] Connection timed out
The memory usage is excessive. The only memory I have is 280 mb for the JSON. 280mb x 6 workers should amount to approx. 1.7gb, and definitely not 2gb on each worker.
I suspect the JSON is being copied to all workers. Dask's documentation also states that the data is copied onto the workers:
replicate -> Set replication of futures within network. Copy data onto many workers. This helps to broadcast frequently accessed data and can improve resilience. This performs a tree copy of the data throughout the network individually on each piece of data. This operation blocks until complete. It does not guarantee replication of data to future workers.
However, still, this does not explain the excessive memory usage and why Dask is not managing to copy 280 mb to 6 workers in less than 1 hour.
The only way that seems to work is something like this:
def read_only_data(dask_worker: Worker, jsonfilepath):
with open(jsonfilepath, "r") as readfile:
dask_worker.data["customdata"] = json.load(read_file)
def main():
cluster = SSHCluster(params) # simplified
client = Client(cluster)
callback = partial(read_only_data, jsonfilepath=jsonfilepath)
client.register_worker_callbacks(callback)
def worker_method():
worker = get_worker()
custom_data = worker.data["customdata"]
And from what I can see the read_only_data method is computed on the worker side, as intended. There is an open issue to mark the register_worker_callbacks method as deprecated in favour of the WorkerPlugin, however, I did not manage to work with the WorkerPlugin.