3

I'm very new to this, so not sure if this script could be simplified/if I'm doing something wrong that's resulting in this happening. I've written an ETL script for AWS Glue that writes to a directory within an S3 bucket.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# catalog: database and table names
db_name = "events"
tbl_base_event_info = "base_event_info"
tbl_event_details = "event_details"

# output directories
output_dir = "s3://whatever/output"

# create dynamic frames from source tables
base_event_source = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_base_event_info)
event_details_source = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_event_details)

# join frames
base_event_source_df = workout_event_source.toDF()
event_details_source_df = workout_device_source.toDF()
enriched_event_df = base_event_source_df.join(event_details_source_df, "event_id")
enriched_event = DynamicFrame.fromDF(enriched_event_df, glueContext, "enriched_event")

# write frame to json files 
datasink = glueContext.write_dynamic_frame.from_options(frame = enriched_event, connection_type = "s3", connection_options = {"path": output_dir}, format = "json")
job.commit()

The base_event_info table has 4 columns: event_id, event_name, platform, client_info The event_details table has 2 columns: event_id, event_details

The joined table schema should look like: event_id, event_name, platform, client_info, event_details

After I run this job, I expected to get 2 json files, since that's how many records are in the resulting joined table. (There are two records in the tables with the same event_id) However, what I get is about 200 files in the form of run-1540321737719-part-r-00000, run-1540321737719-part-r-00001, etc:

  • 198 files contain 0 bytes
  • 2 files contain 250 bytes (each with the correct info corresponding to the enriched events)

Is this the expected behavior? Why is this job generating so many empty files? Is there something wrong with my script?

ak2
  • 83
  • 1
  • 6

3 Answers3

5

The Spark SQL module contains the following default configuration:

spark.sql.shuffle.partitions set to 200.

that's why you are getting 200 files in the first place. You can check if this is the case by doing the following:

enriched_event_df.rdd.getNumPartitions()

if you get a value of 200 then you can change it with the number of files you want to generate with the following code:

enriched_event_df.repartition(2)

The above code will create only two files with your data.

Aida Martinez
  • 559
  • 3
  • 7
0

In my experience empty output files point to an error in transformations. You can debug these using the error functions.

Btw. why are you doing the joins using Spark DataFrames instead of DynamicFrames?

-1

Instead of repartition, you can add column like timestamp to the dataframe through spark sql transformation step and add it as partition key while writing the dataframe to S3

For example: select replace(replace(replace(string(date_trunc('HOUR',current_timestamp())),'-',''),':',''),' ','') as datasetdate, * from myDataSource;

use datasetdate as partitionkey while writing dynamicframe, glue job should be able to add partitions automatically

Jacopo Sciampi
  • 2,990
  • 1
  • 20
  • 44