I'm trying to await a Python coroutine
in Rust. I'm using Pyo3-asyncio
to convert the coroutine
into a Rust Future
. I'm using tokio::spawn
to create a new asynchronous task in which the conversion is done and awaited.
More specifically, I pass a random Python async
function into a function defined using pyo3
, In Rust I spawn a new task
, convert the coroutine
into a Future
and await it. I would expect this to run every task to completion.
#[pyfunction]
fn run(py: Python, py_callable: Py<PyAny>) -> PyResult<()> {
pyo3_asyncio::tokio::run(py, async move {
let handle = tokio::spawn(async move {
let future = Python::with_gil(|py| {
let awaitable = py_callable.call1(py, ()).unwrap();
pyo3_asyncio::tokio::into_future(awaitable.as_ref(py)).unwrap()
});
let future_result = future.await.unwrap();
println!("Got from future: {:?}", future_result);
});
handle.await.unwrap();
Ok(())
}).unwrap();
PyResult::Ok(())
}
#[pymodule]
fn my_module(_: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(run, m)?)?;
Ok(())
}
pyo3 = "0.20.0"
pyo3-asyncio = "0.20.0"
tokio = 1.13
I built using maturin
, then in Python:
import my_module
async def foo() -> int:
print("running Python")
await asyncio.sleep(1)
print("Done in Python")
return 3
if __name__ == "__main__":
my_module.run(foo)
Running this example will result in an error when calling py03_asyncio::into_future
; "called Result::unwrap()
on an Err
value: PyErr { type: <class 'RuntimeError'>, value: RuntimeError('no running event loop'), traceback: None }"
When reading the documentation it seems like, asynchronous tasks need to have the event loop provided to them somehow, but I can't figure it out.
If tokio::spawn
is removed, everything works as expected;
#[pyfunction]
fn run_no_task(py: Python, py_callable: Py<PyAny>) -> PyResult<()> {
pyo3_asyncio::tokio::run(py, async move {
let future = Python::with_gil(|py| {
let awaitable = py_callable.call1(py, ()).unwrap();
pyo3_asyncio::tokio::into_future(awaitable.as_ref(py)).unwrap()
});
let future_result = future.await.unwrap();
println!("Got from future: {:?}", future_result);
Ok(())
}).unwrap();
PyResult::Ok(())
}
I'm clearly misunderstanding something here; how should I pass the Python event loop to a new task
?
You need to save the TaskLocals
and use them:
#[pyfunction]
fn run(py: Python, py_callable: Py<PyAny>) -> PyResult<()> {
pyo3_asyncio::tokio::run(py, async move {
let task_locals = Python::with_gil(|py| pyo3_asyncio::tokio::get_current_locals(py))?;
let handle = tokio::spawn(pyo3_asyncio::tokio::scope(task_locals, async move {
let future = Python::with_gil(|py| {
let awaitable = py_callable.call1(py, ()).unwrap();
pyo3_asyncio::tokio::into_future(awaitable.as_ref(py)).unwrap()
});
let future_result = future.await.unwrap();
println!("Got from future: {:?}", future_result);
}));
handle.await.unwrap();
Ok(())
})
.unwrap();
PyResult::Ok(())
}