I want to stream an object that is gotten from the AWS SDK which is of type aws_smithy_types::byte_stream
as an Actix Web response which expects a futures::stream
.
Here is the get
function:
pub async fn get(&self, object_name: &str) -> Option<ByteStream> {
let get_request = self
.client
.get_object()
.bucket(&self.bucket_name)
.key(object_name)
.send()
.await;
if get_request.is_ok() {
let result = get_request.unwrap();
debug!("{:?}", result);
info!("Got successfully {} from {}", object_name, self.bucket_name);
let bytes: ByteStream = result.body;
Some(bytes)
} else {
error!("{:?}", get_request.unwrap_err());
error!("Unable to get {} from {}.", object_name, self.bucket_name);
None
}
}
and here is the place I want to return the stream directly to client
// Get the file content as a stream of bytes
if let Some(mut stream) = s3.get(&obj_id.to_string()).await {
// while let Ok(Some(bytes)) = stream.try_next().await {
// println!("{:?}", bytes);
// }
// HttpResponse::Ok().body("Stream")
}
I have tried many methods but cannot solve it.
The way to create a streaming response is simply with:
HttpResponseBuilder::new(StatusCode::OK).streaming(stream)
But .streaming()
expects something that implements Stream<Item = Result<Bytes, E>>
which aws-smithy-types' ByteStream
does not. Fortunately it does have a way to read the data via the AsyncRead
trait from tokio, who then has a ReaderStream
from the tokio-util crate that can convert from AsyncRead
into a Stream
.
Here's how that would look:
[dependencies]
actix-web = "4.7.0"
aws-smithy-types = { version = "1.2.0", features = ["rt-tokio"] }
tokio-util = "0.7.11"
use tokio_util::io::ReaderStream;
if let Some(stream) = ... {
let stream = ReaderStream::new(stream.into_async_read());
HttpResponseBuilder::new(StatusCode::OK).streaming(stream)
} ...