1

I came across a situation where we need to use a plain gRPC client (through the grpc.aio API) to talk to an Arrow Flight gRPC server.

The DoGet call did make it to the server, and we have received a FlightData in response. If our understanding of the Flight gRPC definition is correct, the response contains a flatbuffers message that can somehow be decoded into a RecordBatch.

Following, is the client-side code,

import asyncio
import pathlib

import grpc
import pyarrow as pa
import pyarrow.flight as pf

import flight_pb2, flight_pb2_grpc

async def main():
    ticket = pf.Ticket("tick")
    sock_file = pathlib.Path.cwd().joinpath("arena.sock").resolve()
    async with grpc.aio.insecure_channel(f"unix://{sock_file}") as channel:
        stub = flight_pb2_grpc.FlightServiceStub(channel)
        async for data in stub.DoGet(flight_pb2.Ticket(ticket=ticket.ticket)):
            assert type(data) is flight_pb2.FlightData
            print(data)
            # How to convert data into a RecordBatch?

asyncio.run(main())

Currently we stuck on this last step of decoding the FlightData response.

The question is two fold,

  1. are there some existing facilities form pyarrow.flight that we can use to decode a python grpc object of the FlightData type;
  2. if #1 is not possible, what are some other options to decode the content of the FlightData and reconstruct a RecordBatch from scratch?

The main interest here is to use the AsyncIO of plain gRPC client. Supposedly, this is not feasible with the current version of Arrow Flight gRPC client.

liuyu
  • 1,279
  • 11
  • 25

1 Answers1

4

There is indeed no utility exposed in pyarrow.flight for this.

ArrowData contains, among other things, the Arrow IPC header and body. So you can instead decode it using pyarrow.ipc. Here's an example:

import asyncio
import pathlib
import struct

import grpc
import pyarrow as pa
import pyarrow.flight as pf

import Flight_pb2, Flight_pb2_grpc

async def main():
    ticket = pf.Ticket("tick")
    async with grpc.aio.insecure_channel("localhost:1234") as channel:
        stub = Flight_pb2_grpc.FlightServiceStub(channel)
        schema = None
        async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
            # 4 bytes: Need IPC continuation token
            token = b'\xff\xff\xff\xff'
            # 4 bytes: message length (little-endian)
            length = struct.pack('<I', len(data.data_header))
            buf = pa.py_buffer(token + length + data.data_header + data.data_body)
            message = pa.ipc.read_message(buf)
            print(message)
            if schema is None:
                # This should work but is unimplemented
                # print(pa.ipc.read_schema(message))
                schema = pa.ipc.read_schema(buf)
                print(schema)
            else:
                batch = pa.ipc.read_record_batch(message, schema)
                print(batch)
                print(batch.to_pydict())

asyncio.run(main())

Server:

import pyarrow.flight as flight
import pyarrow as pa

class TestServer(flight.FlightServerBase):
    def do_get(self, context, ticket):
        table = pa.table([[1,2,3,4]], names=["a"])
        return flight.RecordBatchStream(table)

TestServer("grpc://localhost:1234").serve()

There's some discussion about async Flight APIs, please join the dev@ mailing list if you would like to chime in.

li.davidm
  • 11,736
  • 4
  • 29
  • 31
  • thanks a lot. It would be awesome if flight and plain grpc can somehow interoperate on the python side (e.g. serve both with the same grpc server, access both endpoints through the same grpc channel preferably via `grpc.aio` interfaces, etc.). IMHO, it could open up a broader category of application scenarios (and potentially reduce the burden to maintain a dedicated FlightServerBase). If this is indeed on the dev roadmap, I would very much like to help make it happen. – liuyu Jan 08 '22 at 06:16
  • This is not easily possible AFAIK, unless you have insider access to the gRPC team at Google, since the Python and C++ gRPC implementations are not quite compatible (and grpcio does not expose its Cython internals anyways). – li.davidm Jan 10 '22 at 13:01