1

I am running pyspark job on EMR ( 5.5.1 ) with Spark 2.1.0, Hadoop 2.7.3, Hive 2.1.1, Sqoop 1.4.6 and Ganglia 3.7.2 which is loading data from s3. There are multiple buckets that contain input files so I have a function which uses boto to traverse through them and filter them out according to some pattern.

Cluster Size: Master => r4.xlarge , Workers => 3 x r4.4xlarge

Problem: The function getFilePaths returns a list of s3 paths which is directly fed to spark dataframe load method.

Using Dataframe

file_list = getFilePaths() # ['s3://some_bucket/log.json.gz','s3://some_bucket/log2.json.gz']
schema = getSchema()  # for mapping to the json files
df = sparkSession.read.format('json').load(file_list, schema=schema)

Using RDD

master_rdd = sparkSession.sparkContext.union(
    map(lambda file: sparkSession.sparkContext.textFile(file), file_list)
)
df = sparkSession.createDataFrame(master_rdd, schema=schema)

The file_list can be a huge list ( max 500k files ) due to large amount of data & files. Calculation of these paths only takes 5-20mins but when trying to load them as dataframe with spark, spark UI remains inactive for hours i.e. not processing anything at all. The inactivity period for processing 500k files is above 9hrs while for 100k files it is around 1.5hrs.

Viewing Gangilla metrics shows that only driver is running/processing while workers are idle. There are no logs generated until the spark job has finished and I haven't got any success with 500k files.

enter image description here

I have tried s3, s3n connectors but no success.

Question:

  • Figure out the root cause of this delay?
  • How can I debug it properly ?
Abdul Mannan
  • 1,072
  • 12
  • 19
  • 1
    Had some issues with spark not reading files in parallel and went this route https://stackoverflow.com/questions/28685874/pyspark-how-to-read-many-json-files-multiple-records-per-file/36483873#36483873 – David Jun 13 '18 at 17:31

2 Answers2

1

In general, Spark/Hadoop prefer to have large files they can split instead of huge numbers of small files. One approach you might try though would be to parallelize your file list and then load the data in a map call.

I don't have the resources right now to test this out, but it should be something similar to this:

file_list = getFilePaths()
schema = getSchema()  # for mapping to the json files

paths_rdd = sc.parallelize(file_list)

def get_data(path):
    s3 = boto3.resource('s3')

    obj = s3.Object(bucket, path)
    data = obj.get()['Body'].read().decode('utf-8')
    return [json.loads(r) for r in data.split('\n')]

rows_rdd = rdd.flatMap(get_data)
df = spark.createDataFrame(rows_rdd, schema=schema)

You could also make this a little more efficient by using mapPartition instead so you don't need to recreate the s3 object each time.

EDIT 6/14/18:

With regards to handling the gzip data, you can decompress a stream of gzip data using python as detailed in this answer: https://stackoverflow.com/a/12572031/1461187 . Basically just pass in obj.get()['Body'].read() into the function defined in that answer.

Ryan Widmaier
  • 7,948
  • 2
  • 30
  • 32
  • These files are approx. 60mb in size each so I can't consolidate them further. I will give it a try and see if it can help improve the performance. – Abdul Mannan Jun 13 '18 at 17:38
  • You should be able to combine them as long you picked a file format that is splitable (ie. not gzip). It would probably require some extra data load steps to convert your data over, but it's a lot easier to work with if you can just convert everything to Parquet or Avro to start with before saving to S3 or doing any extensive work with it. Then you get the schema reading and file splitting out of the box with no extra work. – Ryan Widmaier Jun 13 '18 at 17:45
  • Yup, that is my intention but first i want to get this to work. Json is alot slower so I'm thinking about parquet before I can start processing them. – Abdul Mannan Jun 13 '18 at 17:52
  • Parquet is great when working with HDFS, but for S3 make sure you look at the optimizations you need to set to avoid extra writing. I generally prefer avro with S3 for that reason. – Ryan Widmaier Jun 13 '18 at 17:54
  • Is there a way we can directly use the file paths with dataframe ? I would like to avoid working with rdds – Abdul Mannan Jun 13 '18 at 18:13
  • You could do the equivalent using UDF's, but I don't think it would save you too much since it would still require python code to run on the executors to do the data loading. The only other way I know of is the way you were already trying using a file list or glob to the DF load function. – Ryan Widmaier Jun 13 '18 at 18:16
  • You forgot that these files are gzip so we have to unzip before we can convert to json ( which I'm not sure how much it will impact performance + worker memory utilization depending on file size ) – Abdul Mannan Jun 14 '18 at 12:34
1

There's two performance issues surfacing

  1. reading the files: gzip files can't be split to have their workload shared across workers, though with 50 MB files, there's little benefit in splitting things up
  2. The way the S3 connectors spark uses mimic a directory structure is a real performance killer for complex directory trees.

Issue #2 is what slows up partitioning: the initial code to decide what to do, which is done before any of the computation.

How would I go about trying to deal with this? Well, there's no magic switch here. But

  • have fewer, bigger files; as noted, Avro is good, so are Parquet and ORC later.
  • use a very shallow directory tree. Are these files all in one single directory? Or in a deep directory tree? The latter is worse.

Coalesce the files first.

I'd also avoid any kind of schema inference; it sounds like you aren't doing that (good!), but for anyone else reading this answer: know that for CSV and presumably JSON, schema inference means "read through all the data once just to work out the schema"

stevel
  • 12,567
  • 1
  • 39
  • 50
  • Unfortunately, they are in deep directory structure. I have plans for using different compression format than json but there are some limitations so I have to at least make it work with current setup. – Abdul Mannan Jun 14 '18 at 10:29
  • compression changes will speed up worker changes, but do nothing for the partitioning process up front. Sorry – stevel Jun 14 '18 at 13:17