pythonasynchronousrustpyo3

Rust PyO3-asyncio; awaiting Python coroutine in spawned tokio::task


I'm trying to await a Python coroutine in Rust. I'm using Pyo3-asyncio to convert the coroutine into a Rust Future. I'm using tokio::spawn to create a new asynchronous task in which the conversion is done and awaited.

More specifically, I pass a random Python async function into a function defined using pyo3, In Rust I spawn a new task, convert the coroutine into a Future and await it. I would expect this to run every task to completion.

#[pyfunction]
fn run(py: Python, py_callable: Py<PyAny>) -> PyResult<()> {
    pyo3_asyncio::tokio::run(py, async move {
        let handle = tokio::spawn(async move {
            let future = Python::with_gil(|py| {
                let awaitable = py_callable.call1(py, ()).unwrap();
                pyo3_asyncio::tokio::into_future(awaitable.as_ref(py)).unwrap()
            });
            let future_result = future.await.unwrap();
            println!("Got from future: {:?}", future_result);
        });
        handle.await.unwrap();
        Ok(())
    }).unwrap();
    PyResult::Ok(())
}

#[pymodule]
fn my_module(_: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(run, m)?)?;
    Ok(())
}

pyo3 = "0.20.0" pyo3-asyncio = "0.20.0" tokio = 1.13

I built using maturin, then in Python:

import my_module

async def foo() -> int:
    print("running Python")
    await asyncio.sleep(1)
    print("Done in Python")
    return 3
    

if __name__ == "__main__":
    my_module.run(foo)

Running this example will result in an error when calling py03_asyncio::into_future; "called Result::unwrap() on an Err value: PyErr { type: <class 'RuntimeError'>, value: RuntimeError('no running event loop'), traceback: None }"

When reading the documentation it seems like, asynchronous tasks need to have the event loop provided to them somehow, but I can't figure it out.

If tokio::spawn is removed, everything works as expected;

#[pyfunction]
fn run_no_task(py: Python, py_callable: Py<PyAny>) -> PyResult<()> {
    pyo3_asyncio::tokio::run(py, async move {
        let future = Python::with_gil(|py| {
            let awaitable = py_callable.call1(py, ()).unwrap();
            pyo3_asyncio::tokio::into_future(awaitable.as_ref(py)).unwrap()
        });
        let future_result = future.await.unwrap();
        println!("Got from future: {:?}", future_result);
        Ok(())
    }).unwrap();
    PyResult::Ok(())
}

I'm clearly misunderstanding something here; how should I pass the Python event loop to a new task?


Solution

  • You need to save the TaskLocals and use them:

    #[pyfunction]
    fn run(py: Python, py_callable: Py<PyAny>) -> PyResult<()> {
        pyo3_asyncio::tokio::run(py, async move {
            let task_locals = Python::with_gil(|py| pyo3_asyncio::tokio::get_current_locals(py))?;
            let handle = tokio::spawn(pyo3_asyncio::tokio::scope(task_locals, async move {
                let future = Python::with_gil(|py| {
                    let awaitable = py_callable.call1(py, ()).unwrap();
                    pyo3_asyncio::tokio::into_future(awaitable.as_ref(py)).unwrap()
                });
                let future_result = future.await.unwrap();
                println!("Got from future: {:?}", future_result);
            }));
            handle.await.unwrap();
            Ok(())
        })
        .unwrap();
        PyResult::Ok(())
    }