streamrustrust-actix

How can I `flatmap` streams in Rust?


I have a rusoto_core::ByteStream which implements futures' Stream trait:

let chunks = vec![b"1234".to_vec(), b"5678".to_vec()];
let stream = ByteStream::new(stream::iter_ok(chunks));

I'd like to pass it to actix_web's HttpResponseBuilder::streaming method.

use actix_web::dev::HttpResponseBuilder; // 0.7.18
use rusoto_core::ByteStream; // 0.36.0

fn example(stream: ByteStream, builder: HttpResponseBuilder) {
    builder.streaming(stream);
}

When I try to do it I receive the following error:

error[E0271]: type mismatch resolving `<rusoto_core::stream::ByteStream as futures::stream::Stream>::Item == bytes::bytes::Bytes`
 --> src/main.rs:5:13
  |
5 |     builder.streaming(stream);
  |             ^^^^^^^^^ expected struct `std::vec::Vec`, found struct `bytes::bytes::Bytes`
  |
  = note: expected type `std::vec::Vec<u8>`
             found type `bytes::bytes::Bytes`

I believe the reason is that streaming() expects a S: Stream<Item = Bytes, Error> (i.e., Item = Bytes) but my ByteStream has Item = Vec<u8>. How can I fix it?

I think the solution is to flatmap my ByteStream somehow but I couldn't find such a method for streams.

Here's an example how streaming() can be used:

let text = "123";
let (tx, rx_body) = mpsc::unbounded();
let _ = tx.unbounded_send(Bytes::from(text.as_bytes()));

HttpResponse::Ok()
    .streaming(rx_body.map_err(|e| error::ErrorBadRequest("bad request")))

Solution

  • How can I flatmap streams in Rust?

    A flat map converts an iterator of iterators into a single iterator (or stream instead of iterator).

    Futures 0.3

    Futures 0.3 doesn't have a direct flat map, but it does have StreamExt::flatten, which can be used after a StreamExt::map.

    use futures::{stream, Stream, StreamExt}; // 0.3.1
    
    fn into_many(i: i32) -> impl Stream<Item = i32> {
        stream::iter(0..i)
    }
    
    fn nested() -> impl Stream<Item = i32> {
        let stream_of_number = into_many(5);
        let stream_of_stream_of_number = stream_of_number.map(into_many);
        let flat_stream_of_number = stream_of_stream_of_number.flatten();
    
        // Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
        flat_stream_of_number
    }
    

    Futures 0.1

    Futures 0.1 doesn't have a direct flat map, but it does have Stream::flatten, which can be used after a Stream::map.

    use futures::{stream, Stream}; // 0.1.25
    
    fn into_many(i: i32) -> impl Stream<Item = i32, Error = ()> {
        stream::iter_ok(0..i)
    }
    
    fn nested() -> impl Stream<Item = i32, Error = ()> {
        let stream_of_number = into_many(5);
        let stream_of_stream_of_number = stream_of_number.map(into_many);
        let flat_stream_of_number = stream_of_stream_of_number.flatten();
    
        // Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
        flat_stream_of_number
    }
    

    However, this doesn't solve your problem.

    streaming() expects a S: Stream<Item = Bytes, Error> (i.e., Item = Bytes) but my ByteStream has Item = Vec<u8>

    Yes, this is the problem. Use Bytes::from via Stream::map to convert your stream Item from one type to another:

    use bytes::Bytes; // 0.4.11
    use futures::Stream; // 0.1.25 
    
    fn example(stream: ByteStream, mut builder: HttpResponseBuilder) {
        builder.streaming(stream.map(Bytes::from));
    }