daskworker

Dask not starting workers


I am trying to use Dask to perform a groupby operation on a Dataframe. The code below does not work but it seems that if I initialize the Client from another console the code works, even though I can't see anything on the dashboard ( http://localhost:8787/status ): I mean, there is a dashboard, but all the figures look empty. I am on macOS. Code:

from datetime import datetime
import numpy as np
import os
from dask import dataframe as dd
from dask.distributed import Client
import pandas as pd

client = Client()
# open http://localhost:8787/status

csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
dir_destination = 'data'


df = dd.read_csv(csv_path,
     dtype = {
        'timestamp': str,
         'node_id': str,
         'subsystem': str,
         'sensor': str,
         'parameter': str, 
         'value_raw': str, 
         'value_hrf': str,
     },
     parse_dates=['timestamp'],
     date_parser=lambda x: pd.datetime.strptime(x, '%Y/%m/%d %H:%M:%S')
)


#%%
if not os.path.exists(dir_destination):
        os.makedirs(dir_destination)

def create_node_csv(df_node):
    # test function
    return len(df_node)

res = df.groupby('node_id').apply(create_node_csv, meta=int)

The csv file is simply composed by columns of string. My goal is to group of all the rows that contains a certain value in a column and than save them as separates file using create_node_csv(df_node) (even though right now is a dummy function). Any other way to do it is appreciated, but I would like to understand what's going on here.

When I run it, the console prints multiple times the following errors: tornado.application - ERROR - Multiple exceptions in yield list Traceback (most recent call last):

 File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 208, in _start_worker
    yield w._start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 157, in _start
    response = yield self.instantiate()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 226, in instantiate
    self.process.start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 370, in start
    yield self.process.start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 35, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 184, in _start
    process.start()
  File "/anaconda3/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/anaconda3/lib/python3.7/multiprocessing/context.py", line 291, in _Popen
    return Popen(process_obj)
  File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 35, in __init__
    super().__init__(process_obj)
  File "/anaconda3/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
    _check_not_importing_main()
  File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

And:

distributed.nanny - WARNING - Worker process 1844 exited with status 1
distributed.nanny - WARNING - Restarting worker

And:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker
    raise gen.TimeoutError("Worker failed to start")
tornado.util.TimeoutError: Worker failed to start
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker

EDIT: Based on the answer: - How do I prevent the creation of a new Client if I run the program again? - How can I do the following?

def create_node_csv(df_node):
    return len(df_node)

It returns me the following error, is it related to the meta parameter?

ValueError: cannot reindex from a duplicate axis

Solution

  • When you run the script, Client() is causing new Dask workers to be spawned, which also get copies of variables from the original main process. In some some cases, this involves re-importing the script in each worker, each of which, of course, then tries to create a Client and new set of processes.

    The best answer, as in general with anything running in processes, is to use functions, and protect the main execution. The following would be a way to do this, without changing your one-script structure:

    from datetime import datetime
    import numpy as np
    import os
    from dask import dataframe as dd
    from dask.distributed import Client
    import pandas as pd
    
    csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
    dir_destination = 'data'
    
    def run():
        client = Client()
    
        df = dd.read_csv(csv_path, ...)
        if not os.path.exists(dir_destination):
                os.makedirs(dir_destination)
        
        def create_node_csv(df_node):
            # test function
            return len(df_node)
        
        res = df.groupby('node_id').apply(create_node_csv, meta=int)
        print(res.compute())
    
    if __name__ == "__main__":
        run()
    

    How do I prevent the creation of a new Client if I run the program again?

    In the call to Client() you can include the address of an existing cluster, if you know what that would be. Also, some specific types of deployments (are there are a few) may have a concept of the "current cluster".