pythonjupyter-notebookpython-multiprocessing

Parallelising functions using multiprocessing in Jupyter Notebook


Edit: I updated the question with a trivial repeatable example for ipython, PyCharm and Visual Studio Code. They all fail in a different way.

I am running CPU-intensive tasks in Jupyter Notebook. The task is trivial to parallelise and I am already able to do this in a notebook via threads. However, due to Python's GIL, this is inefficient as the GIL prevents effectively utilising multiple CPU cores for parallel tasks.

The obvious solution would be multiprocessing Python module, and I have this working with Python application code (not notebooks). However, due to how Jupyter Notebook operates, multiprocessing fails due to lack of __main__ entrypoint.

I do not want to create separate Python modules, because they defeat the purpose of using notebooks for data research in the first place.

Here is the minimum repeatable example.

I create a notebook with a single cell:

# Does not do actual multiprocessing, but demostrates it fails in a notebook
from multiprocessing import Process

def task():
    return 2

p = Process(target=task)
p.start()
p.join()

Running this with IPython gives:

ipython notebooks/notebook-multiprocess.ipynb

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 125, in _main
    prepare(preparation_data)
  File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 236, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 287, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
  File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py", line 289, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py", line 96, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/Users/moo/code/ts/trade-executor/notebooks/notebook-multiprocess.ipynb", line 5, in <module>
    "execution_count": null,
NameError: name 'null' is not defined

Running this with PyCharm gives:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'task' on <module '__main__' (built-in)>

Running this with Visual Studio Code gives:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'task' on <module '__main__' (built-in)>

My current parallelisation using a thread pool works:

results = []

def process_background_job(a, b):
   # Do something for the batch of data and return results
   pass

# If you switch to futureproof.executors.ProcessPoolExecutor
# here it will crash with the above error
executor = futureproof.executors.ThreadPoolExecutor(max_workers=8)
with futureproof.TaskManager(executor, error_policy="log") as task_manager:
    
    # Send individual jobs to the multiprocess worker pool
    total_tasks = 0
    for look_back in look_backs:
        for look_forward in look_forwards:
            task_manager.submit(process_background_job, look_back, look_forward)
            total_tasks += 1

    print(f"Processing grid search {total_tasks} background jobs")

    # Run the background jobs and read back the results from the background worker
    # with a progress bar
    with tqdm(total=total_tasks) as progress_bar:
        for task in task_manager.as_completed():
            if isinstance(task.result, Exception):
                executor.join()
                raise RuntimeError(f"Could not complete task for args {task.args}") from task.result
            
            look_back, look_forward, long_regression, short_regression = task.result
            results.append([
                look_back,
                look_forward,
                long_regression.rsquared,
                short_regression.rsquared
            ])
            progress_bar.update()

How can I use process-based parallelization in notebooks?

Python 3.10, but happy to upgrade if it helps.


Solution

  • You appear to be using macOS, and the problems you are running into are because of the lack of full support for forking a process in macOS. As such, multiprocessing on macOS starts subprocesses using the spawn method. The following paragraphs describes why the problem occurs. The simple solution is to define the function in a module that can be imported by both the notebook and the worker processes. Alternatively, skip to the bottom for a workaround using cloudpickle.

    When you fork a process (the default method for starting a multiprocessing worker process on Linux), you get a copy of the memory of the parent process. Meaning the worker process can access and call any function that was defined in the __main__ module when it was forked. However, worker processes created with the spawn method start with a blank slate. As such, they must be able to find the function by reference. This means importing the origin module of the function and looking for it by name in the module. If a function was defined in the __main__ module then it must be importable and the __main__ that multiprocessing expects.

    When you start a Jupyter notebook it launches a new kernel. This kernel is REPL-based rather than source/file based. As such the __main__ module will be that of the kernel's REPL and not the code that you are inserting into the cells of the notebook.

    As it stands, there is no way to force multiprocessing to be able to pick up the source defined in a REPL on macOS (or Windows). There is, however, one possibility. If we change the way python pickles functions, then we can send the function to the worker process. cloudpickle is a third-party library that pickles functions in their entirety, rather than by reference. As such. You can monkey-patch cloudpickle into multiprocessing.reduction.ForkingPickler using a reducer_override, so that multiprocessing will use cloudpickle rather than pickle to pickle functions.

    import sys
    from multiprocessing import Pool
    from multiprocessing.reduction import ForkingPickler
    from types import FunctionType
    import cloudpickle
    
    assert sys.version_info >= (3, 8), 'python3.8 or greater required to use reducer_override'
    
    def reducer_override(obj):
        if type(obj) is FunctionType:
            return (cloudpickle.loads, (cloudpickle.dumps(obj),))
        else:
            return NotImplemented
    
    # Monkeypatch our function reducer into the pickler for multiprocessing.
    # Without this line, the main block will not work on windows or macOS.
    # Alterntively, moving the defintionn of foo outside of the if statement
    # would make the main block work on windows or macOS (when run from
    # the command line).
    ForkingPickler.reducer_override = staticmethod(reducer_override)
    
    if __name__ == '__main__':
        def foo(x, y):
            return x * y
        
        with Pool() as pool:
            res = pool.apply(foo, (10, 3))
    
        print(res)
        assert res == 30