19

I'm trying to overwrite my parquet files with pyarrow that are in S3. I've seen the documentacion and I haven't found anything.

Here is my code:

from s3fs.core import S3FileSystem
import pyarrow as pa
import pyarrow.parquet as pq

s3 = S3FileSystem(anon=False)
output_dir = "s3://mybucket/output/my_table"

my_csv = pd.read_csv(file.csv)
my_table = pa.Table.from_pandas(my_csv , preserve_index=False)

pq.write_to_dataset(my_table, 
                    output_dir,
                    filesystem=s3,
                    use_dictionary=True,
                    compression='snappy')

Is there something like mode = "overwrite" option in write_to_dataset function?

Andrew Gaul
  • 2,296
  • 1
  • 12
  • 19
Mateo Rod
  • 544
  • 2
  • 6
  • 14

3 Answers3

2

I think the best way to do it is with AWS Data Wrangler that offers 3 differents write modes:

  1. append
  2. overwrite
  3. overwrite_partitions

Example:

import awswrangler as wr

wr.s3.to_parquet(
    dataframe=df,
    path="s3://...",
    mode="overwrite",
    dataset=True,
    database="my_database",  # Optional, only with you want it available on Athena/Glue Catalog
    table="my_table",
    partition_cols=["PARTITION_COL_NAME"])
Igor Tavares
  • 869
  • 11
  • 8
2

Here's a solution using pyarrow.parquet (need version 8+! see docs regarding arg: "existing_data_behavior") and S3FileSystem.

Now decide if you want to overwrite partitions or parquet part files which often compose those partitions.

Overwrite single .parquet file

pq.write_to_dataset(
    my_table, 
    root_path='bucket/mydata/year=2022/data_part001.parquet',
    filesystem=s3,
    existing_data_behavior="overwrite_or_ignore"
)

Overwrite .parquet files with common basename within each partition

pq.write_to_dataset(
    my_table, 
    root_path='bucket/mydata',
    partition_cols=['year'],
    basename_template='data_part001.parquet',
    filesystem=s3,
    existing_data_behavior="overwrite_or_ignore"
)

Overwriting existing partitions that match new records

If some of your new records belong to a partition that already exists, that entire partition will be overwritten and new partitions will be added with:

pq.write_to_dataset(
    my_table,
    root_path='bucket/mydata',
    partition_cols=['year'],
    filesystem=s3,
    existing_data_behavior="delete_matching"
)
Wassadamo
  • 1,176
  • 12
  • 32
0

Sorry, there's no a such option yet but the way I work around it is using boto3 to delete the files before writing them.

import boto3
resource = boto3.resource('s3')
resource.Bucket('mybucket').objects.filter(Prefix='output/my_table').delete()
Amin
  • 763
  • 7
  • 22