I came across a situation where we need to use a plain gRPC client (through the grpc.aio
API) to talk to an Arrow Flight gRPC server.
The DoGet
call did make it to the server, and we have received a FlightData
in response. If our understanding of the Flight gRPC
definition is correct, the response contains a flatbuffers
message that can somehow be decoded into a RecordBatch
.
Following, is the client-side code,
import asyncio
import pathlib
import grpc
import pyarrow as pa
import pyarrow.flight as pf
import flight_pb2, flight_pb2_grpc
async def main():
ticket = pf.Ticket("tick")
sock_file = pathlib.Path.cwd().joinpath("arena.sock").resolve()
async with grpc.aio.insecure_channel(f"unix://{sock_file}") as channel:
stub = flight_pb2_grpc.FlightServiceStub(channel)
async for data in stub.DoGet(flight_pb2.Ticket(ticket=ticket.ticket)):
assert type(data) is flight_pb2.FlightData
print(data)
# How to convert data into a RecordBatch?
asyncio.run(main())
Currently we stuck on this last step of decoding the FlightData
response.
The question is two fold,
pyarrow.flight
that we can use to decode a python grpc
object of the FlightData
type;FlightData
and reconstruct a RecordBatch
from scratch?The main interest here is to use the AsyncIO of plain gRPC client. Supposedly, this is not feasible with the current version of Arrow Flight gRPC client.
There is indeed no utility exposed in pyarrow.flight for this.
ArrowData contains, among other things, the Arrow IPC header and body. So you can instead decode it using pyarrow.ipc
. Here's an example:
import asyncio
import pathlib
import struct
import grpc
import pyarrow as pa
import pyarrow.flight as pf
import Flight_pb2, Flight_pb2_grpc
async def main():
ticket = pf.Ticket("tick")
async with grpc.aio.insecure_channel("localhost:1234") as channel:
stub = Flight_pb2_grpc.FlightServiceStub(channel)
schema = None
async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
# 4 bytes: Need IPC continuation token
token = b'\xff\xff\xff\xff'
# 4 bytes: message length (little-endian)
length = struct.pack('<I', len(data.data_header))
buf = pa.py_buffer(token + length + data.data_header + data.data_body)
message = pa.ipc.read_message(buf)
print(message)
if schema is None:
# This should work but is unimplemented
# print(pa.ipc.read_schema(message))
schema = pa.ipc.read_schema(buf)
print(schema)
else:
batch = pa.ipc.read_record_batch(message, schema)
print(batch)
print(batch.to_pydict())
asyncio.run(main())
Server:
import pyarrow.flight as flight
import pyarrow as pa
class TestServer(flight.FlightServerBase):
def do_get(self, context, ticket):
table = pa.table([[1,2,3,4]], names=["a"])
return flight.RecordBatchStream(table)
TestServer("grpc://localhost:1234").serve()
There's some discussion about async Flight APIs, please join the dev@ mailing list if you would like to chime in.