I am attempting to use the reqwest
crate to stream binary data in arrow-IPC format from a REST API endpoint out of my control. The reqwest::Reponse
object has a bytes_stream()
method that returns a type implementing the trait Stream<Item = Result<Bytes>>
. I'm hoping its possible to read this as a RecordBatch stream in some way, such as with arrow-ipc::reader::StreamReader
or some other implementer of the arrow_array::RecordBatchReader
trait. What's the best way to do this?
The arrow
crate does not have support for async currently (although there is an open issue for that), but the alternative arrow2
does, and luckily it also defines a conversion layer between its types and arrow
's types. So this is one option (the code is quite involved because arrow2
's async support requires an AsyncRead
, but what we have is a Stream
giving Byte
s):
use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use arrow2::datatypes::Schema;
use arrow2::io::ipc::read::stream_async::AsyncStreamReader;
use arrow2::io::ipc::read::StreamMetadata;
use arrow2::io::ipc::IpcSchema;
use arrow_format::ipc::MetadataVersion;
use bytes::Bytes;
use futures::stream::Fuse;
use futures::{AsyncRead, Stream, StreamExt};
use reqwest::Client;
struct StreamAsAsyncRead<St> {
stream: Fuse<St>,
last: Option<Bytes>,
}
impl<St: Stream> StreamAsAsyncRead<St> {
fn new(stream: St) -> Self {
Self {
stream: stream.fuse(),
last: None,
}
}
}
impl<St: Stream<Item = reqwest::Result<Bytes>> + Unpin> AsyncRead for StreamAsAsyncRead<St> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = &mut *self;
let data = match &mut this.last {
Some(data) if !data.is_empty() => data,
last => {
let Some(next_data) = ready!(this.stream.poll_next_unpin(cx)) else {
return Poll::Ready(Ok(0));
};
let next_data = next_data.map_err(|err| io::Error::other(err))?;
last.insert(next_data)
}
};
let fill_len = std::cmp::min(buf.len(), data.len());
buf[..fill_len].copy_from_slice(&data[..fill_len]);
data.advance(fill_len);
Poll::Ready(Ok(fill_len))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let response = client.get("<address>").send().await?;
let data_metadata = StreamMetadata {
schema: Schema::default(),
version: MetadataVersion::V5,
ipc_schema: IpcSchema {
fields: Vec::new(),
is_little_endian: true,
},
};
let mut stream = AsyncStreamReader::new(
StreamAsAsyncRead::new(response.bytes_stream()),
data_metadata,
);
while let Some(item) = stream.next().await {
let item = item?;
for item in item.into_arrays() {
let item = arrow::array::ArrayRef::from(item);
// Do something with `item`.
}
}
Ok(())
}
Cargo.toml:
[dependencies]
arrow = "50.0.0"
reqwest = { version = "0.11.24", features = ["stream"] }
tokio = { version = "1.36.0", features = ["full"] }
bytes = "1.5.0"
futures = "0.3.30"
arrow-format = "0.8.1"
arrow2 = { version = "0.18.0", features = [
"io_ipc",
"io_ipc_read_async",
"arrow",
] }
Another, simpler option is to give up async support and just use reqwest's blocking interface:
use arrow::ipc::reader::StreamReader;
use reqwest::blocking::Client;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let response = client.get("<address>").send()?;
let reader = StreamReader::try_new(response, None)?;
for item in reader {
let item = item?;
// Do something with `item`.
}
Ok(())
}