rustgrpctonicprost

Cannot determine return type for grpc stream


I am trying to implement a simple stream rpc using tonic and grpc. I have been following the routeguide tutorial.

When I try to return a tokio_stream::wrappers::ReceiverStream from my stream method I get compile error indicating I should return a Result wrapped by a ReceiverStream.

warning: unused manifest key: package.author
   Compiling prng_generator v0.1.0 (/home/babbleshack/projects/prng_generator)
error[E0308]: mismatched types
  --> src/bin/server.rs:51:46
   |
51 |         Ok(Response::new(ReceiverStream::new(rx)))
   |                                              ^^ expected enum `Result`, found struct `PrngResponse`
   |
   = note: expected struct `tokio::sync::mpsc::Receiver<Result<PrngResponse, Status>>`
              found struct `tokio::sync::mpsc::Receiver<PrngResponse>`
note: return type inferred to be `tokio::sync::mpsc::Receiver<Result<PrngResponse, Status>>` here
  --> src/bin/server.rs:41:67
   |
41 |       ) -> Result<Response<Self::PRNGServiceRequestStream>, Status> {
   |  ___________________________________________________________________^
42 | |         let (mut tx, rx) = mpsc::channel(4);
43 | |         let response_data = self.data.clone();
44 | |
...  |
51 | |         Ok(Response::new(ReceiverStream::new(rx)))
52 | |     }
   | |_____^

For more information about this error, try `rustc --explain E0308`.
error: could not compile `prng_generator` due to previous error

When I wrap my return channel in a Result I get a contradicting error message:

error[E0308]: mismatched types
  --> src/bin/server.rs:51:46
   |
51 |         Ok(Response::new(ReceiverStream::new(Ok(rx))))
   |                                              ^^^^^^ expected struct `tokio::sync::mpsc::Receiver`, found enum `Result`
   |
   = note: expected struct `tokio::sync::mpsc::Receiver<Result<PrngResponse, Status>>`
                found enum `Result<tokio::sync::mpsc::Receiver<PrngResponse>, _>`
note: return type inferred to be `tokio::sync::mpsc::Receiver<Result<PrngResponse, Status>>` here
  --> src/bin/server.rs:41:67
   |
41 |       ) -> Result<Response<Self::PRNGServiceRequestStream>, Status> {
   |  ___________________________________________________________________^
42 | |         let (mut tx, rx) = mpsc::channel(4);
43 | |         let response_data = self.data.clone();
44 | |
...  |
51 | |         Ok(Response::new(ReceiverStream::new(Ok(rx))))
52 | |     }
   | |_____^

Here is my proto:

use std::sync::Arc;
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};

use tokio_stream::wrappers::ReceiverStream;

pub mod types {
    tonic::include_proto!("types");
}
use types::prng_service_server::PrngService;
use types::{PrngRequest, PrngResponse};


And the implementing rust code:

use std::sync::Arc;
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};

use tokio_stream::wrappers::ReceiverStream;

pub mod types {
    tonic::include_proto!("types");
}
use types::prng_service_server::PrngService;
use types::{PrngRequest, PrngResponse};

#[derive(Debug, Default)] 
pub struct PRNGServiceImpl{
    data: Arc<Vec<PrngResponse>>,
}


#[tonic::async_trait]
impl PrngService for PRNGServiceImpl {
    type PRNGServiceRequestStream = ReceiverStream<Result<PrngResponse, Status>>;

    async fn prng_service_request(
        &self,
        request: Request<PrngRequest>,
    ) -> Result<Response<Self::PRNGServiceRequestStream>, Status> {
        let (mut tx, rx) = mpsc::channel(256);
        let response_data = self.data.clone();

        tokio::spawn(async move {
            for response in &response_data[..] {
               Ok(tx.send(response.clone()).await.unwrap());
            }
            println!(" /// done sending");
        });
        Ok(Response::new(ReceiverStream::new(rx)))
        //Ok(Response::new(ReceverStream::new(Ok(rx))))
    }
}

How do I determine what my return type should be here?


Solution

  • The error message indicates that your return type declares a stream that produces Result<PrngResponse, Status> values, but the stream you have given it produces PrngResponse values. Your attempt to fix the solution wraps the receiver channel in a Result, which is not the same thing.

    To fix this, you need to change the type that rx is inferred to be. It's inferred to be a stream of PrngResponse because of the tx.send() call, which sends a PrngResponse, so you can fix this by sending a Result instead:

    tx.send(Ok(response.clone()))
    

    The compiler points at the returned value and not the tx.send() line because the problem is the mismatch between the declared return type of the function and what it actually returns. The compiler could probably figure out that this is due to the tx.send() invocation, but in many cases type inference uses information spread out over multiple lines and there may not be one single line responsible for the inferred type of the returned value.

    One way you could trace the problem to its source is to provide an explicit type somewhere matching the return type. For example:

    let (mut tx, rx) = mpsc::channel::<Result<PrngResponse, Status>>(256);
    

    This change would resolve the return type issue, and the compiler would have then pointed at the tx.send() line indicating that the sent value is not a Result.