6

What I am trying to do

I am using PyArrow to read some CSVs and convert them to Parquet. Some of the files I read have plenty of columns and have a high memory footprint (enough to crash the machine running the job). I am trying to chunk through the file while reading the CSV in a similar way to how Pandas read_csv with chunksize works.

For example this is how the chunking code would work in pandas:

chunks = pandas.read_csv(data, chunksize=100, iterator=True)

# Iterate through chunks
for chunk in chunks:
    do_stuff(chunk)

I want to port a similar functionality to Arrow

What I have tried to do

I noticed that Arrow has ReadOptions which include a block_size parameter, and I thought maybe I could use it like:

# Reading in-memory csv file
arrow_table = arrow_csv.read_csv(
    input_file=input_buffer,
    read_options=arrow_csv.ReadOptions(
        use_threads=True,
        block_size=4096
    )
)

# Iterate through batches
for batch in arrow_table.to_batches():
    do_stuff(batch)

As this (block_size) does not seem to return an iterator, I am under the impression that this will still make Arrow read the entire table in memory and thus recreate my problem.

Lastly, I am aware that I can first read the csv using Pandas and chunk through it then convert to Arrow tables. But I am trying to avoid using Pandas and only use Arrow.

I am happy to provide additional information if needed

alt-f4
  • 2,112
  • 17
  • 49
  • If you just want to convert some CSVs to parquet, try this nice rust based CLI tool: [csv2parquet](https://github.com/domoritz/csv2parquet). I found it much easier than messing around trying to get pyarrow to play nicely. – daviewales May 11 '22 at 02:30

1 Answers1

13

The function you are looking for is pyarrow.csv.open_csv which returns a pyarrow.csv.CSVStreamingReader. The size of the batches will be controlled by the block_size option you noticed. For a complete example:

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv

in_path = '/home/pace/dev/benchmarks-proj/benchmarks/data/nyctaxi_2010-01.csv.gz'
out_path = '/home/pace/dev/benchmarks-proj/benchmarks/data/temp/iterative.parquet'

convert_options = pyarrow.csv.ConvertOptions()
convert_options.column_types = {
    'rate_code': pa.utf8(),
    'store_and_fwd_flag': pa.utf8()
}

writer = None
with pyarrow.csv.open_csv(in_path, convert_options=convert_options) as reader:
    for next_chunk in reader:
        if next_chunk is None:
            break
        if writer is None:
            writer = pq.ParquetWriter(out_path, next_chunk.schema)
        next_table = pa.Table.from_batches([next_chunk])
        writer.write_table(next_table)
writer.close()

This example also highlights one of the challenges the streaming CSV reader introduces. It needs to return batches with consistent data types. However, when parsing CSV you typically need to infer the data type. In my example data the first few MB of the file have integral values for the rate_code column. Somewhere in the middle of the batch there is a non-integer value (* in this case) for that column. To work around this issue you can specify the types for columns up front as I am doing here.

Pace
  • 41,875
  • 13
  • 113
  • 156
  • Hi Pace! I have implemented the approach but ran across a different error. Decided to cover it as a separate question [here](https://stackoverflow.com/questions/68652157/how-do-i-debug-overflowerror-value-too-large-to-convert-to-int32-t) if you have time = ) – alt-f4 Aug 04 '21 at 13:31
  • Hi @Pace, I have one question. How is it possible that ParquetWriter.write_table is not simply overwriting the `out_path` file with the latest chunk?. Is `writer` opened in 'append' mode? I'm newish to the parquet format and I can't find any info in the docs – ABaron Nov 07 '22 at 09:59
  • 1
    @ABaron The example creates one instance of `ParquetWriter` that lives for the lifetime of the scan. A file is opened (once) when the `ParquetWriter` instance is created. Each call to `write_table` is adding more data to the existing open file. At the end, when `writer.close` is called then the file is closed. So there is no need for an append in this case, thought parquet file would not be readable until the entire loop had run and the file had closed. – Pace Nov 07 '22 at 18:40