1

I have a Shuffle Exception with a count, I need help, this is the error:

21/12/17 11:01:47 INFO DAGScheduler: Job 20 failed: count at /home/spark/jobs.zip/tca/jobs/participation_on_volume_metric/participation_on_volume_metric_job.py:388, took 1283.109346 s
21/12/17 11:01:47 INFO DAGScheduler: Resubmitting ShuffleMapStage 130 (leftOuterJoin at /home/spark/jobs.zip/tca/jobs/participation_on_volume_metric/participation_on_volume_metric_job.py:261) and ShuffleMapStage 132 (leftOuterJoin at /home/spark/jobs.zip/tca/jobs/participation_on_volume_metric/participation_on_volume_metric_job.py:277) due to fetch failure
Traceback (most recent call last):
  File "/home/spark/pywrap.py", line 53, in <module>
    app.run(main=main, argv=[sys.argv[0]] + unparsed)
  File "/home/spark/jobs.zip/tca/platform/app.py", line 20, in run
  File "/home/spark/libs.zip/absl/app.py", line 300, in run
  File "/home/spark/libs.zip/absl/app.py", line 251, in _run_main
  File "/home/spark/pywrap.py", line 32, in main
    job.analyze(spark_context, arguments, {'config': job_conf})
  File "/home/spark/jobs.zip/tca/jobs/participation_on_volume_metric/participation_on_volume_metric_job.py", line 388, in analyze
  File "/home/spark/libs.zip/pyspark/rdd.py", line 1055, in count
  File "/home/spark/libs.zip/pyspark/rdd.py", line 1046, in sum
  File "/home/spark/libs.zip/pyspark/rdd.py", line 917, in fold
  File "/home/spark/libs.zip/pyspark/rdd.py", line 816, in collect
  File "/home/spark/libs.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/home/spark/libs.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: A shuffle map stage with indeterminate output was failed and retried. However, Spark cannot rollback the ShuffleMapStage 132 to re-process the input data, and has to fail this job. Please eliminate the indeterminacy by checkpointing the RDD before repartition and try again.

It failed in the count after rdd unions:

orders_metric_rdd = sc.union([orders_with_mic_metric_rdd, \
        orders_with_childs_metric_rdd, \
        orders_without_childs_metric_rdd])
        
    orders_metric_rdd.cache()

    partitions = max(1, orders_metric_rdd.count())
    partitions = min(partitions, max_partitions)
Guille
  • 23
  • 2

1 Answers1

0

From the error log it looks like you need to add a checkpoint. You can do so like this.

orders_metric_rdd = sc.union([orders_with_mic_metric_rdd, \
        orders_with_childs_metric_rdd, \
        orders_without_childs_metric_rdd])
        
sc.setCheckpointDir("/tmp/checkpoint_dir/")
orders_metric_rdd.checkpoint()

partitions = max(1, orders_metric_rdd.count())
partitions = min(partitions, max_partitions)
BoomBoxBoy
  • 1,770
  • 1
  • 5
  • 23
  • Can you explain to me the checkpoint concept, thank you – Guille Dec 18 '21 at 18:16
  • Basically a checkpoint saves the RDD in memory. It is essentially a breakpoint in the execution plan. You might find this discussion helpful -> [Checkpoint vs Persist](https://stackoverflow.com/questions/35127720/what-is-the-difference-between-spark-checkpoint-and-persist-to-a-disk) – BoomBoxBoy Dec 18 '21 at 23:10