I'm doing a simple groupBy on a fairly small dataset (80 files in HDFS, few gigs in total). I'm running Spark on 8 low-memory machines in a yarn cluster, i.e. something along the lines of:
spark-submit ... --master yarn-client --num-executors 8 --executor-memory 3000m --executor-cores 1
The dataset consists of strings of length 500-2000.
I'm trying to do a simple groupByKey
(see below), but it fails with a java.lang.OutOfMemoryError: GC overhead limit exceeded
exception
val keyvals = sc.newAPIHadoopFile("hdfs://...")
.map( someobj.produceKeyValTuple )
keyvals.groupByKey().count()
I can count the group sizes using reduceByKey
without problems, ensuring myself the problem isn't caused by a single excessively large group, nor by an excessive amount of groups :
keyvals.map(s => (s._1, 1)).reduceByKey((a,b) => a+b).collect().foreach(println)
// produces:
// (key1,139368)
// (key2,35335)
// (key3,392744)
// ...
// (key13,197941)
I've tried reformatting, reshuffling and increasing the groupBy level of parallelism:
keyvals.groupByKey(24).count // fails
keyvals.groupByKey(3000).count // fails
keyvals.coalesce(24, true).groupByKey(24).count // fails
keyvals.coalesce(3000, true).groupByKey(3000).count // fails
keyvals.coalesce(24, false).groupByKey(24).count // fails
keyvals.coalesce(3000, false).groupByKey(3000).count // fails
I've tried playing around with spark.default.parallelism
, and increasing spark.shuffle.memoryFraction
to 0.8
while lowering spark.storage.memoryFraction
to 0.1
The failing stage (count) will fail on task 2999 of 3000.
I can't seem to find anything that suggests that groupBy shouldn't just spill to disk instead of keeping things in memory, but I just can't get it to work right, even on fairly small datasets. This should obviosuly not be the case, and I must be doing something wrong, but I have no idea where to start debugging this!