0

S3 hosts a very large compressed file (20gb compressed -> 200gb uncompressed). I want to read this file in (unfortunately decompress on single core), transform some sql columns, and then output to S3 with the s3_path/year=2020/month=01/day=01/[files 1-200].parquet format.

The entirety of the file will comprised of data from the same date. This leads me to believe instead of using partitionBy('year','month','day') I should append "year={year}/month={month}/day={day}/" to the s3 path, because currently spark is writing a single file at a time to s3 (1gb size each). Is my thinking correct?

Here is what I'm doing currently:

df = df\
    .withColumn('year', lit(datetime_object.year))\
    .withColumn('month', lit(datetime_object.month))\
    .withColumn('day', lit(datetime_object.day))

df\
    .write\
    .partitionBy('year','month','day')\
    .parquet(s3_dest_path, mode='overwrite')

What I'm thinking:

df = spark.read.format('json')\
    .load(s3_file, schema=StructType.fromJson(my_schema))\
    .repartition(200)
# currently takes a long time decompressing the 20gb s3_file.json.gz

# transform
df.write\
    .parquet(s3_dest_path + 'year={}/month={}/day={}/'.format(year,month,day))
Oli
  • 9,766
  • 5
  • 25
  • 46
Parity Bit
  • 113
  • 1
  • 12

1 Answers1

2

You're probably running into the problem that spark writes data first to some _temporary directory and only then commit it to the final location. In HDFS this is done by rename. However S3 does not support renames, but instead copies the data fully (only using one executor). For more on this topic see for example this post: Extremely slow S3 write times from EMR/ Spark

Common work-around is to write to hdfs and then use distcp to copy distributed from hdfs to s3

Paul
  • 1,114
  • 8
  • 11