multithreadingrustrust-tokiorust-tonic

How can I send a message from a standard library spawned thread to a Tokio async task?


I have a setup where my program spawns several threads for CPU-bound computation using the std::thread::spawn.

I need a GRPC server to handle incoming commands and also stream outputs done by the worker threads. I'm using tonic for the GRPC server, and it only offers an async implementation inside a Tokio future.

I need to be able to send messages from my "normal" standard-library threads to the Tokio future.

I've boiled my code down the the minimum here:

use std::thread;
use tokio::sync::mpsc; // 1.9.0

fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
    tokio_runtime.spawn(async move {
        // the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
        while let Some(v) = rx.recv().await {}
    });

    let h = thread::spawn(move || {
        // do work
        tx.send(1).await; //<------ error occurs here since I can't await in a non-async block
    });

    h.join().unwrap();
}

How can my main worker threads communicate with the Tokio-spawned GRPC server?


Solution

  • You can use tokio's sync features. There are two options - UnboundedSender and Sender::blocking_send().

    The issue with the unbounded sender is that it does not have back-pressure and if your producer is faster than the consumer your application may crash with an out-of-memory error or exhaust other limited resources your producer uses.

    As a general rule, you should avoid using unbounded queues, which leaves us with the better option of using blocking_send():

    Playground:

    use std::thread;
    use tokio::sync::mpsc; // 1.9.0
    
    fn main() {
        let (tx, mut rx) = mpsc::channel(1);
    
        let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
        tokio_runtime.spawn(async move {
            // the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
            while let Some(v) = rx.recv().await {
                println!("Received: {:?}", v);
            }
        });
    
        let h = thread::spawn(move || {
            // do work
            tx.blocking_send(1).unwrap();
        });
    
        h.join().unwrap();
    }