5

I looked at the standard documentation that I would expect to capture my need (Apache Arrow and Pandas), and I could not seem to figure it out.

I know Python best, so I would like to use Python, but it is not a strict requirement.

Problem

I need to move Parquet files from one location (a URL) to another (an Azure storage account, in this case using the Azure machine learning platform, but this is irrelevant to my problem).

These files are too large to simply perform pd.read_parquet("https://my-file-location.parquet"), since this reads the whole thing into an object.

Expectation

I thought that there must be a simple way to create a file object and stream that object line by line -- or maybe column chunk by column chunk. Something like

import pyarrow.parquet as pq

with pq.open("https://my-file-location.parquet") as read_file_handle:
    with pq.open("https://my-azure-storage-account/my-file.parquet", "write") as write_filehandle:
        for next_line in read_file_handle{
            write_file_handle.append(next_line)

I understand it will be a little different because Parquet is primarily meant to be accessed in a columnar fashion. Maybe there is some sort of config object that I would pass which specifies which columns of interest, or maybe how many lines can be grabbed in a chunk or something similar.

But the key expectation is that there is a means to access a parquet file without loading it all into memory. How can I do this?

FWIW, I did try to just use Python's standard open function, but I was not sure how to use open with a URL location and a byte stream. If it is possible to do this via just open and skip anything Parquet-specific, that is also fine.

Update

Some of the comments have suggested using bash-like scripts, such as here. I can use this if there is nothing else, but it is not ideal because:

  • I would rather keep this all in a full language SDK, whether Python, Go, or whatever. If the solution moves into a bash script with pipes, it requires an external call since the final solution will not be written entirely bash, Powershell, or any scripting language.
  • I really want to leverage some of the benefits of Parquet itself. As I mentioned in the comment below, Parquet is columnar storage. So if I have a "data frame" that is 1.1 billion rows and 100 columns, but I only care about 3 columns, I would love to be able to only download those 3 columns, saving a bunch of time and some money, too.
Mike Williamson
  • 4,915
  • 14
  • 67
  • 104
  • Did you look at this? : [How to upload a file to directory in S3 bucket using boto](https://stackoverflow.com/questions/15085864/how-to-upload-a-file-to-directory-in-s3-bucket-using-boto) – ThePyGuy Aug 17 '21 at 15:10
  • Does this answer your question? [downloading a file from Internet into S3 bucket](https://stackoverflow.com/questions/19241671/downloading-a-file-from-internet-into-s3-bucket) – Be Chiller Too Aug 17 '21 at 15:17
  • Well, I am not using S3. And, more generally, I do not want to rely upon another filesystem which might be sufficiently widely known, but still not fully generalized like HTTPS. So I don't want to use S3, Databricks' DBFS, or Azure's DFS, etc. – Mike Williamson Aug 19 '21 at 10:05
  • Also, to be clear, the question is specifically about Parquet. I am _presuming_ two things: (1) if I treat it like a pure binary file and stream it somehow, this should work fine. So something like the `curl` answer that @BeChillerToo mentions should work. (2) Because it is Parquet, I should have the advantage to be able to do some simple processing on the fly while it is streaming. For instance, be able to grab a subset of the columns of interest to store instead of the entire object. I am _hoping_ this can actually be done at request, so needless network I/O is avoided. – Mike Williamson Aug 19 '21 at 10:08
  • If you want to do processing on the fly, that changes the calculus substantially; treating it like a pure binary is likely to be faster, possibly substantially faster, but only suitable for a straight copy with no processing – Jiří Baum Aug 30 '21 at 14:18
  • @MikeWilliamson were you able to resolve it? I am facing similar challenge but realized that ParquetFile wont accept url as a source, so any guidance on it? – av abhishiek May 29 '23 at 16:15
  • @avabhishiek , no, I never solved it. I worked around it instead. I don't remember, since it was a while ago, but I think my toy didn't quite capture the issue: I was doing more than just reading and immediately writing. There was some filtering or something. If I remember, I used a UDF to take care of it. UDF's use pandas behind the scenes, and they grab the data in chunks. Sorry I cannot remember more. – Mike Williamson May 31 '23 at 11:14

4 Answers4

7

Great post, based on @Micah's answer, I put my 2 cents in it, in case you don't want to read the docs. A small snippet is the following:

import pandas as pd
import numpy as np
from pyarrow.parquet import ParquetFile

# create a random df then save to parquet
df = pd.DataFrame({
    'A': np.arange(10000),
    'B': np.arange(10000),
    'C': np.arange(10000),
    'D': np.arange(10000),
})
df.to_parquet('./test/test')

# ****** below is @Micah Kornfield's answer ******
# 1. open parquet file
batch = ParquetFile('./test/test')
# 2. define generator of batches
record = batch.iter_batches(
    batch_size=10,
    columns=['B', 'C'],
)
# 3. yield pandas/numpy data
print(next(record).to_pandas()) # pandas
print(next(record).to_pydict()) # native python dict
Zézouille
  • 503
  • 6
  • 21
  • This script will crash if the `./test` directory does not exist. I'd suggest using `./test.parquet` as a target path instead. – mb7744 Mar 04 '22 at 19:22
5

This is possible but takes a little bit of work because in addition to being columnar Parquet also requires a schema.

The rough workflow is:

  1. Open a parquet file for reading.

  2. Then use iter_batches to read back chunks of rows incrementally (you can also pass specific columns you want to read from the file to save IO/CPU).

  3. You can then transform each pa.RecordBatch from iter_batches further. Once you are done transforming the first batch you can get its schema and create a new ParquetWriter.

  4. For each transformed batch call write_table. You have to first convert it to a pa.Table.

  5. Close the files.

Parquet requires random access, so it can't be streamed easily from a URI (pyarrow should support it if you opened the file via HTTP FSSpec) but I think you might get blocked on writes.

Micah Kornfield
  • 1,325
  • 5
  • 10
  • 2
    Also see https://stackoverflow.com/questions/63891231/pyarrow-incrementally-using-parquetwriter-without-keeping-entire-dataset-in-mem/64469365#64469365 batch sizing can be important for managing memory. – Micah Kornfield Aug 24 '21 at 06:00
  • 1
    Thank you, @Micah ! Yes, I knew it would be tricky and that a schema would be needed. I have found that on SO it is sometimes better to write shorter questions than longer ones, if I don't _really_ know what I am trying to ask. ;) Regardless, the `iter_batches` is *exactly* what I was looking for. I feel dumb not seeing it. I'll work on trying to put it in place. – Mike Williamson Aug 24 '21 at 07:56
2

Parquet to Parquet transformation without reading whole file into memory

Building on the accepted answer in this thread,

import pyarrow as pa
import pyarrow.parquet as pq

parquet_file = pq.ParquetFile('read.parquet')
new_schema = pa.schema([
    ('a', pa.int32()),
    ('b', pa.int32()),
    ('c', pa.int32()),
])
## get arrow schema from parquet file instead of hard coding it.
#arrow_schema = parquet_file.schema_arrow 
with pq.ParquetWriter('write.parquet', schema=new_schema) as writer:
    # iter_batches lets you filter by certain columns or certain row groups as well
    for batch in parquet_file.iter_batches(batch_size=100000):
        df = batch.to_pandas()
        # transformation: transform df by adding a new static column with column name c and value 9999999
        df['c'] = 9999999
        # convert pandas df to record batch
        # schema will be inferred if not provided 
        transformed_batch = pa.RecordBatch.from_pandas(df, schema=new_schema)
        writer.write_batch(transformed_batch)

All the docs are linked in the accepted answer.

enter image description here

Note: Do not keep the batch size very low. This will result in poor compression since batch size corresponds to the row group size in the new parquet file as well.

CSV to Parquet transformation without reading whole file into memory

checkout this answer. - https://stackoverflow.com/a/74258957/6563567

ns15
  • 5,604
  • 47
  • 51
0

Please note that I did not specify the implementation of how to use batches on the distant server side.
My solution would be: writing batches into a buffer with pyarrow.NativeFile then reads the buffer with pyarrow.ipc.RecordBatchFileReader

I created this 2 classes to help you with the streaming process

import asyncio
from pyarrow.parquet import ParquetFile


class ParquetFileStreamer:
    """
    Attributes:
        ip_address: ip address of the distant server
        port: listening port of the distant server
        n_bytes: -1 means read whole batch
        file_source: pathlib.Path, pyarrow.NativeFile, or file-like object
        batch_size: default = 65536
        columns: list of the columns you wish to select (if None selects all)

    Example:
        >>> pfs = ParquetFileStreamer
        >>> class MyStreamer(ParquetFileStreamer)
                file_source = '/usr/fromage/camembert.parquet
                columns = ['name', 'price']
        >>> MyStreamer.start_stream()
    """
    ip_address = '192.168.1.1'
    port = 80
    n_bytes = -1

    file_source: str
    batch_size = 65536
    columns = []

    @classmethod
    def start_stream(cls):
        for batch in cls._open_parquet():
            asyncio.run(cls._stream_parquet(batch))

    @classmethod
    def _open_parquet(cls):
        return ParquetFile(cls.file_source).iter_batches(
            batch_size=cls.batch_size,
            columns=cls.columns
        )

    @classmethod
    async def _stream_parquet(cls, batch):
        reader, writer = await asyncio.open_connection(cls.ip_address, cls.port)
        writer.write(batch)
        await writer.drain()
        await reader.read()
        writer.close()
        await writer.wait_closed()


class ParquetFileReceiver:
    """
    Attributes: \n
        port: specify the port \n
        n_bytes: -1 reads all the batch
    Example:
        >>> pfr = ParquetFileReceiver
        >>> asyncio.run(pfr.server())
    """
    port = 80
    n_bytes = -1

    @classmethod
    async def handle_stream(cls, reader, writer):
        data = await reader.read(cls.n_bytes)
        batch = data.decode()
        print(batch)

    @classmethod
    async def server(cls):
        server = await asyncio.start_server(cls.handle_stream, port=cls.port)
        async with server:
            await server.serve_forever()