pythonpandasdaskdask-distributeddask-delayed

Dask hanging when called from command prompt


I have a program that is running as expected when run in a Jupyter Notebook cell, but is failing/hanging when put into a python file and called from either a Jupyter Notebook or from the command line.

Here is the test code:

    import pandas as pd
    df = pd.DataFrame({'col1':[1,2,3,4],'col2':[5,5,5,5]})

    def B(df):
        df['col3'] = df['col1'] + 100
        return df

    def A(df):
        from dask import delayed, compute
        import numpy as np
        from dask.distributed import Client, LocalCluster

        if __name__ == '__main__':
            cluster = LocalCluster(n_workers = 2)
            client = Client(cluster)

            results_dfs = []
            df_split = np.array_split(df, 2)

            for split in df_split:
                results_dfs.append(delayed(B)(split))

            result = delayed(pd.concat)(results_dfs)
            result = result.compute()

            client.close()
            cluster.close()
    
        return result
    
    result = A(df)
    result

Here is the expected result that I receive when running the code:

expected code result

However when I save this code in a file and then call it from a Jupyter Notebook using %run -i "path-to-code\test.py" the cell runs forever.

When I try to call the same program file from the command prompt I get the following errors:

Traceback (most recent call last):  
  File "<string>", line 1, in <module>  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main  
    exitcode = _main(fd, parent_sentinel)  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\spawn.py", line 125, in _main  
    prepare(preparation_data)  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\spawn.py", line 236, in prepare  
    _fixup_main_from_path(data['init_main_from_path'])  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\spawn.py", line 287, in _fixup_main_from_path  
    main_content = runpy.run_path(main_path,  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\runpy.py", line 268, in run_path  
    return _run_module_code(code, init_globals, run_name,  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\runpy.py", line 97, in _run_module_code  
    _run_code(code, mod_globals, init_globals,  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\runpy.py", line 87, in _run_code  
    exec(code, run_globals)  
  File "C:\User\Desktop\Company Stuff\Code\test.py", line 31, in <module>  
    result = A(df)  
  File "C:\User\Desktop\Company Stuff\Code\test.py", line 29, in A  
    return result  
UnboundLocalError: local variable 'result' referenced before assignment  
Task exception was never retrieved  
future: <Task finished name='Task-12' coro=<_wrap_awaitable() done, defined at C:\User\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py:678> exception=ImportError("cannot import name 'Popen' from partially initialized module 'multiprocessing.popen_spawn_win32' (most likely due to a circular import) (C:\\User\\AppData\\Local\\Programs\\Python\\Python39\\lib\\multiprocessing\\popen_spawn_win32.py)")>  
Traceback (most recent call last):  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 685, in _wrap_awaitable  
    return (yield from awaitable.__await__())  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\core.py", line 284, in _  
    await self.start()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\nanny.py", line 295, in start  
    response = await self.instantiate()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\nanny.py", line 378, in instantiate  
    result = await self.process.start()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\nanny.py", line 575, in start  
    await self.process.start()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\process.py", line 33, in _call_and_set_future  
    res = func(*args, **kwargs)  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\process.py", line 203, in _start  
    process.start()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\process.py", line 121, in start  
    self._popen = self._Popen(self)  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\context.py", line 224, in _Popen  
    return _default_context.get_context().Process._Popen(process_obj)  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\context.py", line 326, in _Popen  
    from .popen_spawn_win32 import Popen  
ImportError: cannot import name 'Popen' from partially initialized module   'multiprocessing.popen_spawn_win32' (most likely due to a circular import) (C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\popen_spawn_win32.py)  
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: None, threads: 2>>  
Traceback (most recent call last):  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\tornado\ioloop.py", line 905, in _run  
    return self.callback()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\nanny.py", line 414, in memory_monitor  
    process = self.process.process  
AttributeError: 'NoneType' object has no attribute 'process'

The last 7 lines of the log then repeat infinitely until I interrupt the program. Why is this program not running when place in its own file?

I'm running windows 10, python 3.9, and here are the environment packagespython:

Package           Version
----------------- --------
asgiref           3.3.0
astroid           2.4.2
bokeh             2.2.3
click             7.1.2
cloudpickle       1.6.0
colorama          0.4.4
dask              2021.2.0
distributed       2021.2.0
Django            3.1.4
et-xmlfile        1.0.1
fsspec            0.8.5
HeapDict          1.0.1
isort             5.6.4
jdcal             1.4.1
Jinja2            2.11.2
lazy-object-proxy 1.4.3
locket            0.2.0
MarkupSafe        1.1.1
mccabe            0.6.1
msgpack           1.0.2
numpy             1.19.3
openpyxl          3.0.5
packaging         20.8
pandas            1.1.5
partd             1.1.0
Pillow            8.0.1
pip               21.0.1
psutil            5.8.0
pylint            2.6.0
pyparsing         2.4.7
python-dateutil   2.8.1
pytz              2020.4
PyYAML            5.3.1
reportlab         3.5.59
setuptools        49.2.1
six               1.15.0
sortedcontainers  2.3.0
sqlparse          0.4.1
tblib             1.7.0
toml              0.10.2
toolz             0.11.1
tornado           6.1
typing-extensions 3.7.4.3
wrapt             1.12.1
xlrd              2.0.1
zict              2.0.0

Solution

  • I think it's just an indentation error, so correcting your script, it works. You might also want to use context managers to ensure that client/cluster close properly:

    import pandas as pd
    df = pd.DataFrame({'col1':[1,2,3,4],'col2':[5,5,5,5]})
    
    def B(df):
        df['col3'] = df['col1'] + 100
        return df
    
    def A(df):
        from dask import delayed, compute
        import numpy as np
        from dask.distributed import Client, LocalCluster
    
        if __name__ == '__main__':
            with LocalCluster(n_workers = 2) as cluster, Client(cluster) as client:
                results_dfs = []
                df_split = np.array_split(df, 2)
    
                for split in df_split:
                    results_dfs.append(delayed(B)(split))
    
                result = delayed(pd.concat)(results_dfs)
                result = result.compute()
    
            return result
    
    result = A(df)
    if __name__ == '__main__':
        print(result)