3

We are appending data to an existing parquet dataset stored in S3 (partitioned) by using pyarrow. This runs on AWS lambda several times per hour. A minimal example would be:

import pyarrow as pa
import pyarrow.parquet as pq
import s3fs

df = ... # Existing pandas df

table = pa.Table.from_pandas(df)

pq.write_to_dataset(
   table, 
   filesystem=s3,
   root_path=f"s3://s3-path/",
   partition_cols=['year', "month"]
)

As a result, a number of parquet files will be written to S3 depending on the internal data values. Our aim is to track which files have been written to the filesystem by outputting their resulting filename (S3 key).

Is there any way to capture the actual filename that is written by pyarrow or s3fs? Parquet file names are arbitrarily named according to the computed hash name and I do not see any logging functionality for neither of the two packages mentioned.

Andrew Gaul
  • 2,296
  • 1
  • 12
  • 19
jarias
  • 150
  • 2
  • 12
  • 1
    What pyarrow version you are using? Starting 0.15.0 you can provide names for your files before writing. https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_to_dataset.html – Prabhakar Reddy Dec 05 '19 at 05:57
  • Thank you! Worked like a charm, have missed that new feature. If you wish to add the comment as an answer I will mark it as selected, otherwise I can answer it myself. – jarias Dec 10 '19 at 09:40
  • Will post the answer in sometime. Thanks for the confirmation. – Prabhakar Reddy Dec 10 '19 at 09:41

3 Answers3

4

Starting 0.15.0 you can provide names as partition_filename_cb for your files before writing.

pyarrow.parquet.write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, **kwargs)
Prabhakar Reddy
  • 4,628
  • 18
  • 36
1

If you are open to also use AWS Data Wrangler:

import awswrangler as wr

paths = wr.pandas.to_parquet(
    dataframe=df,
    path="s3://...",
    dataset=True,
    database="my_database",  # Optional, only with you want it available on Athena/Glue Catalog
    table="my_table",
    partition_cols=["PARTITION_COL_NAME"])["paths"]

print(paths)
Igor Tavares
  • 869
  • 11
  • 8
0

Just to clarify @Prabhakar Reddy's answer....the partition_filename_cb argument requires a callback function. Simply use a lambda if you wish to provide a string like shown below.

pyarrow.parquet.write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=lambda x: 'myfilename.parquet', filesystem=None, **kwargs)
user3496060
  • 800
  • 10
  • 20