So I have 2 containers with dask workers setup. I'll refer to them as
main
which has 4 workers and the full applicationremote
which has another 4 workers, the full app's code but isn't running the full app, just the workers code.I'm passing a dictionary with settings to each worker from the main
container through a setup function callback using register_worker_callbacks()
I do it like so:
await client.register_worker_callbacks(lambda: setup_worker(log_config, settings))
And the setup function
def setup_worker(log_config, settings_object):
setup_logging(log_config)
settings.__dict__.update(settings_object.__dict__)
Both containers have this piece of code so it works fine, no problems.
BUT I also have a cronjob running daily which retrieves a new settings file (in JSON) from an external source and from, settings
object on the main
container is reloaded / updated.
This newly updated settings
object needs to propagate and update the settings on the workers too, basically I need a way to call the last line of the setup_worker
function again, doing the exact same thing. But since the workers are already registered and connected at this point, I can't re-use the same callback, can I? How would I achieve the same?
It worked simply by using
await client.run(func, args)
as documented here
It takes a callable / function and executes it on all workers without using the scheduler by default. So to update the same settings which are usually loaded by a callback, you can just call that same function with the same arguments here.
So I ended up with 2 functions like so:
def setup_worker(log_config, settings_object):
setup_logging(log_config)
update_settings(settings_object)
def update_settings(settings_object):
settings.__dict__.update(settings_object.__dict__)
The setup callback is set up the same way as before
await client.register_worker_callbacks(lambda: setup_worker(log_config, settings))
But I have an additional function being called when the settings are updated on the main
container like so:
def update_settings():
...
await client.run(update_settings, settings)