I am using hyper 0.12 to build a proxy service. When receiving a response body from the upstream server I want to forward it back to the client ASAP, and save the contents in a buffer for later processing.
So I need a function that:
Stream
(a hyper::Body
, to be precise)Stream
that is functionally identical to the input streamFuture<Item = Vec<u8>, Error = ...>
that is resolved with the buffered contents of the input stream, when the output stream is completely consumedI can't for the life of me figure out how to do this.
I guess the function I'm looking for will look something like this:
type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
let body2 = ... // ???
let buffer = body.fold(Vec::<u8>::new(), |mut buf, chunk| {
buf.extend_from_slice(&chunk);
// ...somehow send this chunk to body2 also?
});
(body2, buffer);
}
Below is what I have tried, and it works until send_data()
fails (obviously).
type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
let (mut sender, body2) = hyper::Body::channel();
let consume =
body.map_err(|_| ()).fold(Vec::<u8>::new(), move |mut buf, chunk| {
buf.extend_from_slice(&chunk);
// What to do if this fails?
if sender.send_data(chunk).is_err() {}
Box::new(future::ok(buf))
});
(body2, Box::new(consume));
}
However, something tells me I am on the wrong track.
I have found Sink.fanout()
which seems like it is what I want, but I do not have a Sink
, and I don't know how to construct one. hyper::Body
implements Stream
but not Sink
.
What I ended up doing was implement a new type of stream that does what I need. This appeared to be necessary because hyper::Body
does not implement Sink
nor does hyper::Chunk
implement Clone
(which is required for Sink.fanout()
), so I cannot use any of the existing combinators.
First a struct that contains all details that we need and methods to append a new chunk, as well as notify that the buffer is completed.
struct BodyClone<T> {
body: T,
buffer: Option<Vec<u8>>,
sender: Option<futures::sync::oneshot::Sender<Vec<u8>>>,
}
impl BodyClone<hyper::Body> {
fn flush(&mut self) {
if let (Some(buffer), Some(sender)) = (self.buffer.take(), self.sender.take()) {
if sender.send(buffer).is_err() {}
}
}
fn push(&mut self, chunk: &hyper::Chunk) {
use hyper::body::Payload;
let length = if let Some(buffer) = self.buffer.as_mut() {
buffer.extend_from_slice(chunk);
buffer.len() as u64
} else {
0
};
if let Some(content_length) = self.body.content_length() {
if length >= content_length {
self.flush();
}
}
}
}
Then I implemented the Stream
trait for this struct.
impl Stream for BodyClone<hyper::Body> {
type Item = hyper::Chunk;
type Error = hyper::Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
match self.body.poll() {
Ok(Async::Ready(Some(chunk))) => {
self.push(&chunk);
Ok(Async::Ready(Some(chunk)))
}
Ok(Async::Ready(None)) => {
self.flush();
Ok(Async::Ready(None))
}
other => other,
}
}
}
Finally I could define an extension method on hyper::Body
:
pub type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()> + Send>;
trait CloneBody {
fn clone_body(self) -> (hyper::Body, BufferFuture);
}
impl CloneBody for hyper::Body {
fn clone_body(self) -> (hyper::Body, BufferFuture) {
let (sender, receiver) = futures::sync::oneshot::channel();
let cloning_stream = BodyClone {
body: self,
buffer: Some(Vec::new()),
sender: Some(sender),
};
(
hyper::Body::wrap_stream(cloning_stream),
Box::new(receiver.map_err(|_| ())),
)
}
}
This can be used as follows:
let (body: hyper::Body, buffer: BufferFuture) = body.clone_body();