After upgrading of dask distributed to version 1.15.0 my logging stopped working.
I've used logging.config.dictConfig
to initialize python logging facilities, and previously these settings propagated to all workers. But after upgrade it doesn't work anymore.
If I do dictConfig
right before every log call on every worker it works but it's not a proper solution.
So the question is how it initialize logging on every worker before my computation graph starts executing and do it only once per worker?
UPDATE:
This hack worked on a dummy example but didn't make a difference on my system:
def init_logging():
# logging initializing happens here
...
client = distributed.Client()
client.map(lambda _: init_logging, client.ncores())
UPDATE 2:
After digging through documentation this fixed the problem:
client.run(init_logging)
So the question now is: Is this a proper way to solve this problem?
As of version 1.15.0 we now fork workers from a clean process, so changes that you make to your process prior to calling Client()
won't affect forked workers. For more information search for forkserver
here: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
Your solution of using Client.run
looks good to me. Client.run is currently (as of version 1.15.0) the best way to call a function on all currently active workers.
It is worth noting that here you're setting up clients forked from the same process on a single computer. The trick you use above will not work in a distributed setting. In case people come to this question asking about how to handle logging with Dask in a cluster context I'm adding this note.
Generally Dask does not move logs around. Instead, it is common that whatever mechanism you used to launch Dask handles this. Job schedulers like SGE/SLURM/Torque/PBS all do this. Cloud systems like YARN/Mesos/Marathon/Kubernetes all do this. The dask-ssh
tool does this.