0

Is it expected that a multiple IPC stream readers can concurrently tail a single stream writer publishing from another process? The descriptions of things like "IPC streams" leads me to think yes, but I cannot find any positive confirmation in the docs, and I dont see anything obvious in the source code aside from a std::mutex protecting concurrent writes in the C++ writer process.

Asking because I have a C++ producer (arrow version 10.0.0) using arrow::ipc::MakeStreamWriter(arrow::io::FileOutputStream) to write the data, and a python consumer (arrow v12) reading the stream using pa.ipc.open_stream(f)... Most of the time, things flow through without issue, but occasionally the reader sees corrupt record batches that yield errors like the below. A re-read of the stream yields the correct data, which leads me to think this is a concurrency race condition.

_logger.exception(f'error writing batch {table_from_batches.to_pandas()}')                                                                                                                                                File "pyarrow/array.pxi", line 837, in pyarrow.lib._PandasConvertible.to_pandas
  File "pyarrow/table.pxi", line 2448, in pyarrow.lib.RecordBatch._to_pandas                                                                                                                                            File "pyarrow/table.pxi", line 4114, in pyarrow.lib.Table._to_pandas                                                                                                                                                  File "../.venv/lib/python3.8/site-packages/pyarrow/pandas_compat.py", line 820, in table_to_blockmanager
    blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
  File "../.venv/lib/python3.8/site-packages/pyarrow/pandas_compat.py", line 1168, in _table_to_blocks
    result = pa.lib.table_to_blocks(options, block_table, categories,
  File "pyarrow/table.pxi", line 2771, in pyarrow.lib.table_to_blocks
  File "pyarrow/error.pxi", line 127, in pyarrow.lib.check_status
pyarrow.lib.ArrowIndexError: Index 80 out of bounds
kdkavanagh
  • 41
  • 5
  • 1
    I don't know if multiple readers is the concern as much as reading and writing concurrently. It sounds like you are writing to a file from one thread/process and reading from that same file in another thread/process while the file is being written? – Pace Aug 01 '23 at 14:11
  • Correct, mostly curious about the concurrency of the writer process vs 1+ readers in separate process(es) – kdkavanagh Aug 01 '23 at 14:27

1 Answers1

0

Arrow-C++ does not support reading from an Arrow IPC file while it is being written. The writer does not atomically write an entire message in one operation. If you read the file while a message is being written you risk reading a partial message which the reader cannot handle.

That being said, an Arrow SPMC message queue sounds like a pretty cool idea and the IPC streaming format should be suitable for this (to the best of my knowledge). It should be possible to write a batch and then, once the write is done, communicate to your readers via some other channel that the data is available. Then you can read it from multiple readers concurrently.

Pace
  • 41,875
  • 13
  • 113
  • 156
  • Much appreciated... Yes, would be super helpful. Typically the pattern to do this in the low-latency world is to write the message payload to the shared buffer, then go back and do a volatile/aligned write of the message header indicating the number of bytes available to be read by readers. The consumer spins on 0 bytes (since the message header hasnt yet been written), till the writer writes the payload + the final message header which releases the hounds, so to speak, to consume the newly written payload – kdkavanagh Aug 01 '23 at 23:20