I'm running a 12-node Spark cluster on AWS EMR using m3.xlarge instances, and am trying to process 36GB of gzipped CSV data. When I run the following code, I get the error:
source_data = spark.read.csv('s3://my-bucket/my_csv_files/',
sep='\t', header=True, nullValue='\\N',
escape='"', schema=schema)
data = (source_data.rdd.keyBy(lambda x: x['id'])
.groupByKey())
Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Yet when I run the same code with a sortBy it works just fine:
data = (source_data.rdd.keyBy(lambda x: x['id'])
.sortBy(lambda x: x[1]['file_timestamp'])
.groupByKey())
Why is this, and how should I troubleshoot? I'm running my cluster using maximizeResourceAllocation
, and have tried to set spark.yarn.executor.memoryOverhead
to a larger value without much luck.