pythongrpcpyarrowapache-arrowapache-arrow-flight

How to decode Arrow Flight `FlightData` with a pure gRPC client


I came across a situation where we need to use a plain gRPC client (through the grpc.aio API) to talk to an Arrow Flight gRPC server.

The DoGet call did make it to the server, and we have received a FlightData in response. If our understanding of the Flight gRPC definition is correct, the response contains a flatbuffers message that can somehow be decoded into a RecordBatch.

Following, is the client-side code,

import asyncio
import pathlib

import grpc
import pyarrow as pa
import pyarrow.flight as pf

import flight_pb2, flight_pb2_grpc

async def main():
    ticket = pf.Ticket("tick")
    sock_file = pathlib.Path.cwd().joinpath("arena.sock").resolve()
    async with grpc.aio.insecure_channel(f"unix://{sock_file}") as channel:
        stub = flight_pb2_grpc.FlightServiceStub(channel)
        async for data in stub.DoGet(flight_pb2.Ticket(ticket=ticket.ticket)):
            assert type(data) is flight_pb2.FlightData
            print(data)
            # How to convert data into a RecordBatch?

asyncio.run(main())

Currently we stuck on this last step of decoding the FlightData response.

The question is two fold,

  1. are there some existing facilities form pyarrow.flight that we can use to decode a python grpc object of the FlightData type;
  2. if #1 is not possible, what are some other options to decode the content of the FlightData and reconstruct a RecordBatch from scratch?

The main interest here is to use the AsyncIO of plain gRPC client. Supposedly, this is not feasible with the current version of Arrow Flight gRPC client.


Solution

  • There is indeed no utility exposed in pyarrow.flight for this.

    ArrowData contains, among other things, the Arrow IPC header and body. So you can instead decode it using pyarrow.ipc. Here's an example:

    import asyncio
    import pathlib
    import struct
    
    import grpc
    import pyarrow as pa
    import pyarrow.flight as pf
    
    import Flight_pb2, Flight_pb2_grpc
    
    async def main():
        ticket = pf.Ticket("tick")
        async with grpc.aio.insecure_channel("localhost:1234") as channel:
            stub = Flight_pb2_grpc.FlightServiceStub(channel)
            schema = None
            async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
                # 4 bytes: Need IPC continuation token
                token = b'\xff\xff\xff\xff'
                # 4 bytes: message length (little-endian)
                length = struct.pack('<I', len(data.data_header))
                buf = pa.py_buffer(token + length + data.data_header + data.data_body)
                message = pa.ipc.read_message(buf)
                print(message)
                if schema is None:
                    # This should work but is unimplemented
                    # print(pa.ipc.read_schema(message))
                    schema = pa.ipc.read_schema(buf)
                    print(schema)
                else:
                    batch = pa.ipc.read_record_batch(message, schema)
                    print(batch)
                    print(batch.to_pydict())
    
    asyncio.run(main())
    

    Server:

    import pyarrow.flight as flight
    import pyarrow as pa
    
    class TestServer(flight.FlightServerBase):
        def do_get(self, context, ticket):
            table = pa.table([[1,2,3,4]], names=["a"])
            return flight.RecordBatchStream(table)
    
    TestServer("grpc://localhost:1234").serve()
    

    There's some discussion about async Flight APIs, please join the dev@ mailing list if you would like to chime in.