asynchronousrustgrpc

How can I use async_stream with tonic?


I'm trying to implement a grpc server that returns a stream using Tonic in Rust. Let's say we have a service like this:

  rpc FooBar(Input) returns (stream Output) {}
}

I generate the code for the boilerplate using prost-build. Following the examples on the site, I could implement the service:

use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

#[derive(Default)]
pub struct FooServer;

#[async_trait]
impl FooService for FooServer {
    type FooBarStream = ReceiverStream<Result<Output, Status>>;

    async fn foo_bar(
        &self,
        request: Request<Input>,
    ) -> std::result::Result<Response<Self::FooBarStream>, Status> {
        let (tx, rx) = mpsc::channel(10);
        tokio::spawn(async move {
            for ... {
                tx.send(Ok(...)).await;
            }
        });

        Ok(Response::new(ReceiverStream::new(rx)))
    }
}

This works, but I think it's a bit ugly. Having found out about async-stream, I thought of rewriting it like this, as it's less boilerplate and easier to read. Possibly also more efficient for not needing an IPC but filling the stream directly.

use async_stream::{stream, AsyncStream};
...

#[async_trait]
impl FooService for FooServer {
    type FooBarStream = AsyncStream?????

    async fn foo_bar(
        &self,
        request: Request<Input>,
    ) -> std::result::Result<Response<Self::FooBarStream>, Status> {
        let stream = stream! {
            for ... {
                yield ...;
            }
        });

        Ok(stream)
    }
}

The problem with this is that I cannot figure out the type of the stream. I tried AsyncStream<Result<Output, Status>>, but it doesn't work because AsyncStream has 2 template parameters, the second one being the function that generates the result, which in this case is an unnamed function.

Is there any way to make this work?


Solution

  • Since the complete AsyncStream is an unnamable type and since the associated-type-position-impl-trait syntax is not yet available (i.e. type FooBarStream = impl Stream<...>;), you'll have to do the next best thing; use a trait object.

    The best way to do that is with Box::pin:

    #[async_trait]
    impl FooService for FooServer {
        type FooBarStream = Pin<Box<dyn Stream<Item = Result<Output, Status>> + Send + 'static>>;
    
        async fn foo_bar(
            &self,
            request: Request<Input>,
        ) -> std::result::Result<Response<Self::FooBarStream>, Status> {
            let stream = stream! {
                for ... {
                    yield ...;
                }
            });
    
            Ok(Response::new(Box::pin(stream) as Self::FooBarStream))
        }
    }
    

    You can see this done in the Bidirectional streaming RPC part of the Tonic guide, though they do use try_stream! in order to use ? for errors.