4

I found a similar question here: Read CSV with PyArrow

In this answer it references sys.stdin.buffer and sys.stdout.buffer, but I am not exactly sure how that would be used to write the .arrow file, or name it. I can't seem to find the exact information I am looking for in the docs for pyarrow. My file will not have any nans, but it will have a timestamped index. The file is ~100 gb, so loading it into memory simply isn't an option. I tried changing the code, but as I assumed, the code ended up overwriting the previous file every loop.

***This is my first post. I would like to thank all the contributors who answered 99.9% of my other questions before I had even the asked them.

import sys

import pandas as pd
import pyarrow as pa

SPLIT_ROWS = 1     ### used one line chunks for a small test

def main():
    writer = None
    for split in pd.read_csv(sys.stdin.buffer, chunksize=SPLIT_ROWS):

        table = pa.Table.from_pandas(split)
        # Write out to file
        with pa.OSFile('test.arrow', 'wb') as sink:     ### no append mode yet
            with pa.RecordBatchFileWriter(sink, table.schema) as writer:
                writer.write_table(table)
    writer.close()

if __name__ == "__main__":
    main()

Below is the code I used in the command line

>cat data.csv | python test.py
kasbah512
  • 43
  • 4
  • Can you look at https://stackoverflow.com/questions/68555085/how-can-i-chunk-through-a-csv-using-arrow/68563617#68563617 and see if that helps? It shows how to do this with a parquet file. Doing this with a .arrow file should be pretty much the same process. Open your writer once and then you can call write_table to it many times (once per chunk) and then you can close it at the end. – Pace Oct 18 '21 at 08:10
  • Your example is quite close too. I think the thing you would want to change is to create the `RecordBatchFileWriter` outside the for loop. You can then call `write_table` many times on the same instance. Then you can close it after the for loop just like you are already doing. – Pace Oct 18 '21 at 08:13

3 Answers3

2

As suggested by @Pace, you should consider moving the output file creation outside of the reading loop. Something like this:

import sys

import pandas as pd
import pyarrow as pa

SPLIT_ROWS = 1     ### used one line chunks for a small test

def main():
    # Write out to file
    with pa.OSFile('test.arrow', 'wb') as sink:     ### no append mode yet
        with pa.RecordBatchFileWriter(sink, table.schema) as writer:
            for split in pd.read_csv('data.csv', chunksize=SPLIT_ROWS):
                table = pa.Table.from_pandas(split)
                writer.write_table(table)

if __name__ == "__main__":
    main()        

You also don't have to use sys.stdin.buffer if you would prefer to specify specific input and output files. You could then just run the script as:

python test.py

By using with statements, both writer and sink will be automatically closed afterwards (in this case when main() returns). This means it should not be necessary to include an explicit close() call.

Martin Evans
  • 45,791
  • 17
  • 81
  • 97
  • 1
    I went with this solution shown here. The schema needed to be defined before writer could write the file so I read the first two lines and defined it before looping through. It does assume consistent datatypes, but for my purposes, that will do just fine. – kasbah512 Oct 19 '21 at 13:36
0

Solution adapted from @Martin-Evans code:

Closed file after the for loop as suggested by @Pace

import sys

import pandas as pd
import pyarrow as pa

SPLIT_ROWS = 1000000

def main():
    schema = pa.Table.from_pandas(pd.read_csv('Data.csv',nrows=2)).schema 
    ### reads first two lines to define schema 

    with pa.OSFile('test.arrow', 'wb') as sink:
        with pa.RecordBatchFileWriter(sink, schema) as writer:            
            for split in pd.read_csv('Data.csv',chunksize=SPLIT_ROWS):
                table = pa.Table.from_pandas(split)
                writer.write_table(table)

            writer.close()

if __name__ == "__main__":
    main()   
kasbah512
  • 43
  • 4
  • 1
    Note: The use of `with` statements here is to ensure `sink` and `writer` are automatically closed afterwards, an explicit `close()` should not be needed. – Martin Evans Oct 19 '21 at 13:57
0

in 2023 you don't need pandas for this. You can chunk through csv using arrow:

import pyarrow as pa
from pyarrow import csv

schema =  pa.schema([
        ('time', pa.timestamp('ms', None)),
        ('deviceid', pa.utf8())
])
convert_dict = {
  'time': pa.timestamp('ms', None),
  'deviceid': pa.utf8()
}
convert_options = pa.csv.ConvertOptions(
    column_types=convert_dict
    , strings_can_be_null=True
    , quoted_strings_can_be_null=True
    ,timestamp_parsers=["%Y-%m-%d %H:%M:%S"],
)

arrowfile = "data_dst.arrow"
csvfile = "data_src.csv"

with pa.OSFile(arrowfile, 'wb') as sink:     ### no append mode yet
   with pa.csv.open_csv(csvfile, convert_options=convert_options) as reader:
       with pa.RecordBatchFileWriter(sink, schema) as writer:
           for next_chunk in reader:
               if next_chunk is None:
                    break
               if writer is None:
                    break
               next_table = pa.Table.from_batches([next_chunk])
               writer.write_table(next_table)
martin
  • 862
  • 9
  • 28