3

Hi, I have an ETL job in AWS Glue that takes a very long time to write. It reads data from S3 and performs a few transformations (all are not listed below, but the transformations do not seem to be the issue) and then finally writes the data frame to S3. However, this writing operation seems to take a very long time. Approx 30 min for a file that is about 20 MB even when I'm using 10 workers (worker type G.1X). I have used print statements to see what takes time and it seems to be the last operation of writing the file to S3. I have not had this issue before using the same kind of setup.

I'm using Glue version 3.0, Python version 3, and Spark version 3.1.

The number of files that are in the source are almost 50 000 files spread out over many folders, new files are generated automatically every day. The approximate average file size is about 10 KB

Any suggestions on this issue?

#Glue context & spark session
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
#Solves the issues with old datetime in the new version of Spark
spark_conf = SparkConf()
spark_conf.setAll([
    ('spark.sql.legacy.parquet.int96RebaseModeInRead', 'CORRECTED'), 
    ('spark.sql.legacy.parquet.int96RebaseModeInWrite', 'CORRECTED'), 
    ('spark.sql.legacy.parquet.datetimeRebaseModeInRead', 'CORRECTED'), 
    ('spark.sql.legacy.parquet.datetimeRebaseModeInWrite', 'CORRECTED')
    ])
conf = SparkConf().set('spark.sql.legacy.parquet.datetimeRebaseModeInRead','CORRECTED')
sc = SparkSession.builder.config(conf=spark_conf).enableHiveSupport().getOrCreate()
#sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session

#Source(/s) - create dynamic frame
dy = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="json",
    connection_options={
        "paths": [
            "s3://.../files/abc/"
        ],
        "recurse": True,
        "groupFiles": "inPartition"
    },
    transformation_ctx="dy",
)

df = dy.toDF()

#Transformation(/s)
df_ready = df\
    .sort(['ID', 'timestamp'], descending=True)\
    .withColumn("timestamp_prev", 
                lag(df.timestamp)
                .over(Window()
                      .partitionBy("ID").orderBy('timestamp')))

df_ready.repartition(1).write.mode('overwrite').parquet("s3a://.../thisismywritefolder/df_ready/")
Qwaz
  • 199
  • 9
  • 1
    The reason why you are seeing the issue in " the last operation of writing the file to S3:" because spark is lazy evaluation and writing is an action that triggers the entire processing. so indeed what transformation you are doing matters but you need to check if there is an optimized way to write them.Doing a repartition will reduce the parallel processing – yogesh garud Mar 29 '22 at 08:57
  • 1
    Can you confirm the average file size in source and number of files in total? I believe this might be causing the job to run slow with repartitioning. Try also mentioning groupSize along with groupFiles. Also try adding useS3ListImplementation to see if it helps – Prabhakar Reddy Mar 30 '22 at 02:23
  • Hi @PrabhakarReddy. The number of files that are in the source are almost 50 000 files spread out over many folders, new files are generated automatically every day. The approximate average file size is about 10 KB and as you can see from the code snipped above I repartitioned it to a single file before writing it to S3. I'm new to Spark, could this be the issue? What would be an appropriate groupSize for this particular case? Thank you for your feedback! – Qwaz Mar 30 '22 at 06:58
  • try adding a group size of 1 MB and see what happens.also add useS3ListImplementation if it helps – Prabhakar Reddy Mar 31 '22 at 01:14

1 Answers1

3

You are repartitioning in the end, which prevents Glue from writing in parallel. If you remove the repartition, you should see increased speeds while writing.

Robert Kossendey
  • 6,733
  • 2
  • 12
  • 42
  • And then you will end up with multiple small files which you can no longer query with e.g. Athena in a performant way. – luk2302 Mar 29 '22 at 07:43
  • That's why you should compact them on a regular. – Robert Kossendey Mar 29 '22 at 07:46
  • Hi @RobertKossendey. Thank you for your answer. Would it be better to not use the write.repartition(1) and instead have a second ETL script that compacts the files to a single file that I can query with Athena and then finally use in a QuickSight analysis? Which is my goal with the whole job. – Qwaz Mar 30 '22 at 07:17