The following code works fine on Python 3.13, but fails on Python 3.14 with a RuntimeError related to asyncio tasks.
If I switch the multiprocessing start method from "fork" to "spawn", the code works again — but "spawn" is too slow for some use cases.
Is there another way to make this work under Python 3.14 without changing the start method?
import asyncio
import inspect
from functools import wraps
from typing import Any, Awaitable, Callable, Union
import pytest
from multiprocess import Pipe, Process
from multiprocess.connection import Connection
import multiprocess as mp
mp.set_start_method(method="fork", force=True) # "spawn" works fine
class SubprocessError:
def __init__(self, ex: Exception) -> None:
self.exception = ex
def in_subprocess[T](func: Callable[..., Union[T, Awaitable[T]]]) -> Callable[..., Awaitable[T]]:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> T:
return await calculate_in_subprocess(func, *args, **kwargs)
return wrapper
async def calculate_in_subprocess[T](func: Callable[..., Union[T, Awaitable[T]]], *args: Any, **kwargs: Any) -> T:
rx, tx = Pipe(duplex=False) # receiver & transmitter ; Pipe is one-way only
process = Process(target=_inner, args=(tx, func, *args), kwargs=kwargs)
process.start()
event = asyncio.Event()
loop = asyncio.get_event_loop()
loop.add_reader(fd=rx.fileno(), callback=event.set)
if not rx.poll(): # do not use process.is_alive() as condition here
await event.wait()
loop.remove_reader(fd=rx.fileno())
event.clear()
result = rx.recv()
process.join() # this blocks synchronously! make sure that process is terminated before you call join()
rx.close()
tx.close()
if isinstance(result, SubprocessError):
raise result.exception
return result
def _inner[T](tx: Connection, fun: Callable[..., Union[T, Awaitable[T]]], *a, **kw_args) -> None:
event_loop = None
if inspect.iscoroutinefunction(fun):
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
try:
if event_loop is not None:
res = event_loop.run_until_complete(fun(*a, **kw_args))
else:
res = fun(*a, **kw_args)
except Exception as ex:
tx.send(SubprocessError(ex=ex))
else:
tx.send(res)
@pytest.mark.asyncio
async def test_in_subprocess_simple_async():
@in_subprocess
async def f() -> int:
return 42
assert await f() == 42
-------------------------------- live log call ---------------------------------
ERROR asyncio:base_events.py:1875 Exception in callback <_asyncio.TaskStepMethWrapper object at 0x7e71ba729ff0>()
handle: <Handle <_asyncio.TaskStepMethWrapper object at 0x7e71ba729ff0>()>
Traceback (most recent call last):
File "/usr/lib/python3.14/asyncio/events.py", line 94, in _run
self._context.run(self._callback, *self._args)
~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: Cannot enter into task <Task pending name='Task-2' coro=<test_in_subprocess_simple_async.<locals>.f() running at foo.py:73> cb=[_run_until_complete_cb() at /usr/lib/python3.14/asyncio/base_events.py:181]> while another task <Task pending name='Task-1' coro=<test_in_subprocess_simple_async() running at foo.py:77> cb=[_run_until_complete_cb() at /usr/lib/python3.14/asyncio/base_events.py:181]> is being executed.
Installed packages (note: multiprocess must be installed from GitHub):
certifi==2025.10.5
charset-normalizer==3.4.3
dill==0.4.0
docker==7.1.0
idna==3.10
iniconfig==2.1.0
multiprocess @ git+https://github.com/uqfoundation/multiprocess.git@02ea4bd36cac5013d70847815c92e1a736ef4a05
packaging==25.0
pluggy==1.6.0
Pygments==2.19.2
pytest==8.4.2
pytest-asyncio==1.2.0
pytest_docker_tools==3.1.9
requests==2.32.5
urllib3==2.5.0
Why does this RuntimeError occur under Python 3.14 with fork, and is there a way to fix it without switching to spawn or forkserver?
On Python 3.14, I get the following results when I ran my code:
| Library | Fork | Spawn | Forkserver |
|---|---|---|---|
multiprocess |
✗ (RuntimeError: Cannot enter into task) | ✓ | ✓ |
multiprocessing |
✗ (RuntimeError: Cannot enter into task) | ✗ (PicklingError, see below) | ✗ (PicklingError, see below) |
def dump(obj, file, protocol=None):
'''Replacement for pickle.dump() using ForkingPickler.'''
> ForkingPickler(file, protocol).dump(obj)
E _pickle.PicklingError: Can't pickle local object <function test_in_subprocess_simple_async.<locals>.f at 0x7b3f34cfcb40>
E when serializing tuple item 1
E when serializing dict item '_args'
E when serializing multiprocessing.context.Process state
E when serializing multiprocessing.context.Process object
So mutliprocessing does not work, because pickle is not as good as dill.
And spawn and forkserver work fine. So my question is: Is there a way to make also fork work?
Before running into the answer to your actual code -
Is there a reason you are not working a ProcessPoolExecutor instead of manually doing all of this?
now into what I could find out
I could no get to the core of why this is failing exactly on 3.14 - but the forked version gets the asyncio loop from the parent process - but adding some print statements, it shows the subprocess stalls in the .run_until_complete call.
When I changed the code to create a subthread, and instead create a new asyncio loop in this subthread, it ran to completion in Python 3.14.
So I suppose some data structures related to the asyncio loop in the main-thread are present in the main-thread of the forked process, even though it doesn't error on creating a new asyncio loop.
When one checks "what is new" on 3.14, a lot of the inner data structures for the asyncio loop where changed for Python 3.14 - so, my bet is that one of those is not properly isolated, and the asyncio loop in the subprocess uses some values that where created in the parent process.
(and it works on 3.13 due to these inner workings being different).
So, we have hit a language bug here - (or I wonder- why in a forked process I the running asyncio loop for the main thread is not simply forked along, and be already running in the child process? Probably there is some specialized code for ignoring the asyncio loop created in the parent process that was not properly updated. )
The workaround if to have an intermediate call before "_inner" that will run your actual worker callable in another thread. I did a proof of concept for this along with my "prints" inside _inner itself:
def _inner[T](tx: Connection, fun: Callable[..., Union[T, Awaitable[T]]], new_thread=False, *a, **kw_args) -> None:
event_loop = None
if inspect.iscoroutinefunction(fun):
if not new_thread:
import threading
t = threading.Thread(target=_inner, args=(tx, fun, *a), kwargs=(kw_args | {"new_thread": True}) )
t.start()
t.join()
return
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
print(event_loop)
try:
if event_loop is not None:
print("WORKING UP TO HERE")
res = event_loop.run_until_complete(fun(*a, **kw_args))
else:
res = fun(*a, **kw_args)
except Exception as ex:
print("exception in subprocess", ex)
tx.send(SubprocessError(ex=ex))
else:
tx.send(res)
(on a side note, remember to change that asyncio.get_event_loop() call in the other function and use asyncio.get_running_loop() instead )