I'm trying to run a job on Yarn mode that processes a large amount of data (2TB) read from google cloud storage.
The pipeline can be summarized like this :
sc.textFile("gs://path/*.json")\
.map(lambda row: json.loads(row))\
.map(toKvPair)\
.groupByKey().take(10)
[...] later processing on collections and output to GCS.
This computation over the elements of collections is not associative,
each element is sorted in it's keyspace.
When run on 10GB of data, it's completed without any issue. However when I run it on the full dataset, it fails all the time with this logs in the containers:
15/11/04 16:08:07 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: xxxxxxxxxxx
15/11/04 16:08:07 ERROR org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/rdd.py", line 1299, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/usr/lib/spark/python/pyspark/context.py", line 916, in runJob
15/11/04 16:08:07 WARN org.apache.spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
I tried investigating by launching each operation one-by-one by connecting to the master and it seems to fail on the groupBy. I also tried to rescale the cluster by adding nodes and upgrading their memory and number of CPUs but I still have the same issue.
120 Nodes + 1 master with the same specs : 8 vCPU - 52GB Mem
I tried to find threads with a similar issue without success so I don't really know what information I should provide since the logs aren't very clear, so feel free to ask for more info.
The primary key is a required value for every record and we need all the keys without filter, that represents roughly 600k keys. Is it really possible to perform this operation without to scale the cluster to something massive? I just read databricks did a sort on 100TB of data (https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html) wich also involves a massive shuffle. They succeeded by replacing multiple in-memory buffers by single one resulting in a lot of Disk IO ? Is it possible with my cluster scale to perform such operation?