3

I'm trying to run a job on Yarn mode that processes a large amount of data (2TB) read from google cloud storage.

The pipeline can be summarized like this :

sc.textFile("gs://path/*.json")\
.map(lambda row: json.loads(row))\
.map(toKvPair)\
.groupByKey().take(10)

 [...] later processing on collections and output to GCS.
  This computation over the elements of collections is not associative,
  each element is sorted in it's keyspace.

When run on 10GB of data, it's completed without any issue. However when I run it on the full dataset, it fails all the time with this logs in the containers:

15/11/04 16:08:07 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: xxxxxxxxxxx
15/11/04 16:08:07 ERROR org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1299, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/usr/lib/spark/python/pyspark/context.py", line 916, in runJob
15/11/04 16:08:07 WARN org.apache.spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down

I tried investigating by launching each operation one-by-one by connecting to the master and it seems to fail on the groupBy. I also tried to rescale the cluster by adding nodes and upgrading their memory and number of CPUs but I still have the same issue.

120 Nodes + 1 master with the same specs : 8 vCPU - 52GB Mem

I tried to find threads with a similar issue without success so I don't really know what information I should provide since the logs aren't very clear, so feel free to ask for more info.

The primary key is a required value for every record and we need all the keys without filter, that represents roughly 600k keys. Is it really possible to perform this operation without to scale the cluster to something massive? I just read databricks did a sort on 100TB of data (https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html) wich also involves a massive shuffle. They succeeded by replacing multiple in-memory buffers by single one resulting in a lot of Disk IO ? Is it possible with my cluster scale to perform such operation?

Paul K.
  • 796
  • 2
  • 7
  • 20
  • 2
    Could you share more details on your toKvPair function? How many values are expected per key when grouped, roughly? Does it produce a "null" key for some records which might not have the expected key? – Dennis Huo Nov 04 '15 at 17:43
  • Yes of course, I just edited to add more information. – Paul K. Nov 04 '15 at 21:57
  • So, the root of the problem is likely to be the fact that groupByKey introducing a non-scalable per-key bottleneck, rather than you running into a "total dataset size" constraint. In the petasort benchmark, typically keys are still themselves unique or limited to very few duplicates, regardless of overall dataset size, which enables scaling up with larger clusters. See this [avoid groupByKey](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html) post by Databricks for a description of how "hot keys" may be a pitfall. – Dennis Huo Nov 04 '15 at 22:03
  • Now, if the 2TB are completely evenly distributed among the 600k keys, it shouldn't be a problem and it should be possible to find some other bottleneck you're running into, since that divides out to only 3MB per group. The problem would be if you have any skewed highly-popular key, where you need something like 4GB of the data all going into a single key. Then groupByKey will run into issues. – Dennis Huo Nov 04 '15 at 22:05
  • Could you try a [countByKey](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.countByKey) instead of a groupByKey and then maybe sort them and get the max count? Counts should be doable even if you have skewed keys because it should be able to combine-sum incrementally without ever loading all values for a single key into memory. – Dennis Huo Nov 04 '15 at 22:07
  • Yes, I just read this post too. I clearly understand the "hot key" issue... I was just reading your last comment when trying to query BigQuery, where the source data is stored, to get the maxcount since I deleted the cluster while I get more informed on the subject. The maximum count is 500k records for one key (~11GB) which is a lot since the average is around 15k. – Paul K. Nov 04 '15 at 22:29
  • I could try to strip those keys from the input by using a window (since the records are events that occur at a fixed interval). – Paul K. Nov 04 '15 at 22:31
  • Do you think that could work with the scale of my cluster or should I rescale it? – Paul K. Nov 04 '15 at 22:37
  • Indeed, stripping the hot keys should make it work even on relatively small clusters. In general scaling more nodes can't actually fix hot keys if using groupbykey, doing so would be more along the lines of larger individual machines and explicitly setting larger spark executor memory. – Dennis Huo Nov 05 '15 at 00:09

1 Answers1

2

To summarize what we learned through the comments on the original question, if a small dataset works (especially one that potentially fits in a single machine's total memory) and then a large dataset fails despite adding significantly more nodes to the cluster, combined with any usage of groupByKey, the most common thing to look for is whether your data has significant imbalance of the number of records per key.

In particular, groupByKey as of today still has a constraint in that not only must all values for a single key get shuffled to the same machine, they must all be able to fit in memory as well:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L503

/**
 * Group the values for each key in the RDD into a single sequence. Allows controlling the
 * partitioning of the resulting key-value pair RDD by passing a Partitioner.
 * The ordering of elements within each group is not guaranteed, and may even differ
 * each time the resulting RDD is evaluated.
 *
 * Note: This operation may be very expensive. If you are grouping in order to perform an
 * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
 * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
 *
 * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
 * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
 */

There's some further discussion of this problem which points at a mailing list discussion which includes some discussion of workarounds; namely, you may be able to explicitly append a hash string of the value/record to the key, hashed into some small set of buckets, so that you manually shard out your large groups.

In your case you could even do a .map transform initially which only conditionally adjusts the keys of known hot keys to divide it up into subgroups while leaving non-hot keys unaltered.

In general, the "in-memory" constraint means you can't really get around significantly skewed keys by adding more nodes, since it requires scaling "in-place" on the hot node. For specific cases you may be able to set spark.executor.memory as a --conf or in dataproc gcloud beta dataproc jobs submit spark [other flags] --properties spark.executor.memory=30g, as long as the max key's values can all fit in that 30g (with some headroom/overhead as well). But that will top out at whatever largest machine is available, so if there's any chance that the size of the max key will grow when your overall dataset grows, it's better to change the key distribution itself rather than try to crank up single-executor memory.

Community
  • 1
  • 1
Dennis Huo
  • 10,517
  • 27
  • 43
  • 1
    Thank you very much for that detailed answer ! I split the keys in shards appending the record timestamp's date to the key and it worked, which also makes the data sorting faster and easy to reduce after to get full information for each key. However I still get an "Out of memory error" further in my pipeline (before reducing sharded data of keys), but I will ask it in another post. Anyway thank you for this great tips. – Paul K. Nov 05 '15 at 13:34
  • Here is the issue : http://stackoverflow.com/questions/33547649/pyspark-reducebykey-causes-out-of-memory, I'm not sure the fact that one executor is used is linked to the groupBy but maybe you could help. – Paul K. Nov 05 '15 at 15:02