1

I have an AWS Glue job whose work is very simple: break large CSV gzip files into 1GB ones.

In my test, I uploaded 4 files into the bucket, each is around 5GB. Yet, the job always assigns all files to a single worker instead of distributing across all workers.

The active worker log:

[Executor task launch worker for task 3] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-4.gz' for reading
[Executor task launch worker for task 0] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-1.gz' for reading
[Executor task launch worker for task 2] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-3.gz' for reading
[Executor task launch worker for task 1] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-2.gz' for reading
[Executor task launch worker for task 0] zlib.ZlibFactory (ZlibFactory.java:<clinit>(49)): Successfully loaded & initialized native-zlib librar

One of rest workers log:

storage.BlockManager (Logging.scala:logInfo(54)): Initialized BlockManager: BlockManagerId(3, 172.31.0.109, 35849, None)

The rest workers are stuck at this step and wait endless and have all 20GB files assigned to the single active task

Its the job script below:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "crawled-database", table_name = "input", transformation_ctx = "datasource0", additional_options = {"groupFiles": "inPartition", "compressionType": "gzip"})

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [ ("tagids", "string", "internal_tagids", "string"), ("channel", "long", "internal_channel", "long")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://glue-report-staging", "groupFiles": "inPartition", "groupSize": "1073741824", "compression": "gzip"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
user1888955
  • 626
  • 1
  • 9
  • 27

1 Answers1

0

The compression format used on your files is the reason for the performance and skew in parallelism. Gzipped files are not splittable. That is the reason why all the work is allocated to a single executor. If you were reading this in pure spark, you could have read as an rdd and repartition to smaller tasks/ partitions.

The dynamic frame also has the same option. You could use datasource0.repartition(n), n being the desired number of partitions. For the G.1X, you can have 8 tasks in parallel per worker, so you can size that according to the workers you have to maximize the parallelism. You can find more on this here

Eman
  • 831
  • 5
  • 8