I am new to Rust and writing a simple application that will stream some values over gRPC, using Tonic. These values are initially acquired from an external library as a BoxStream (Pin<Box<Stream>>
), and tonic's API requires something that implements Stream
(which of course Pin does not).
Tonic's streaming example uses a ReceiverStream to convert a mpsc channel into a stream, and spinning off a thread to push values into it. This would require a stream lifetime of 'static
which is not an option for my actual implementation because the lifetime of my stream is associated with the class that returns it.
What is the best way to provide something that implements Stream, that I can give to Tonic, from my Pin<Box<Stream>>
?
src/main.rs (This will not compile, since BoxStream<'static, Entry> does not implement IntoStreamingRequest)
use futures::prelude::stream::BoxStream;
use async_stream::stream;
use tonic::{IntoStreamingRequest};
struct Entry {
key: String,
}
fn main() {
// Create Request
let stream: BoxStream<'static, Entry> = api_function();
let request = stream.into_streaming_request();
// Send request
//let mut client = DataImporterClient::connect("http://[::1]:50051").await.unwrap();
//let response = client.grpc_function(request).await?;
}
fn api_function() -> BoxStream<'static, Entry> {
Box::pin(stream! {
let entries = vec!(
Entry {key: String::from("value1")},
Entry {key: String::from("value2")}
);
for entry in entries {
yield entry;
}
})
}
Cargo.toml
[package]
name = "tonic-streaming-minimum-example"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tonic = "0.5"
futures = "0.3"
tokio-stream = "0.1"
async-stream = "0.3"
Compilation Error provided:
error[E0599]: the method `into_streaming_request` exists for struct `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>`, but its trait bounds were not satisfied
--> src\main.rs:12:26
|
12 | let request = stream.into_streaming_request();
| ^^^^^^^^^^^^^^^^^^^^^^ method cannot be called on `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>` due to unsatisfied trait bounds
|
::: C:\Users\tmathews\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:408:1
|
408 | pub struct Pin<P> {
| -----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sync`
|
::: C:\Users\tmathews\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-core-0.3.17\src\stream.rs:27:1
|
27 | pub trait Stream {
| ----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sized`
| doesn't satisfy `_: Sync`
|
= note: the following trait bounds were not satisfied:
`Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: futures::Stream`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: std::marker::Send`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: Sync`
which is required by `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sized`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: futures::Stream`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: std::marker::Send`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): futures::Stream`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): Sync`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
The problem is that tonic implements IntoStreamingRequest
only for types that are both Send
and Sync
:
impl<T> IntoStreamingRequest for T
where
T: Stream + Send + Sync + 'static
But BoxStream
is not:
pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
Instead of using BoxStream
you should copy its definition and add an additional + Sync
bound:
fn api_function() -> Pin<Box<dyn Stream<Item = Entry> + Send + Sync + 'static>> {
Box::pin(stream! {
let entries = vec!(
Entry {key: String::from("value1")},
Entry {key: String::from("value2")}
);
for entry in entries {
yield entry;
}
})
}
And because the stream returned by the stream!()
macro is already Send + Sync
your code will compile fine.
PS: remove the unnecessary type hint at:
let stream: BoxStream<'static, Entry> = api_function();
// should become:
let stream = api_function(); // after the above change it's not BoxStream anymore!