2

I'm working on a pipeline that reads a number of hive tables and parses them into some DenseVectors for eventual use in SparkML. I want to do a lot of iteration to find optimal training parameters, both inputs to the model and with computing resources. The dataframe I'm working with is somewhere between 50-100gb all said, spread across a dynamic number of executors on a YARN cluster.

Whenever I try to save, either to parquet or saveAsTable, I get a series of failed tasks before finally it fails completely and suggests raising spark.yarn.executor.memoryOverhead. Each id is a a single row, no more than a few kb.

feature_df.write.parquet('hdfs:///user/myuser/featuredf.parquet',mode='overwrite',partitionBy='id')

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 98 in stage 33.0 failed 4 times, most recent failure: Lost task 98.3 in 
stage 33.0 (TID 2141, rs172.hadoop.pvt, executor 441): ExecutorLostFailure 
(executor 441 exited caused by one of the running tasks) Reason: Container 
killed by YARN for exceeding memory limits. 12.0 GB of 12 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.

I currently have this at 2g.

Spark workers are currently getting 10gb, and the driver (which is not on the cluster) is getting 16gb with a maxResultSize of 5gb.

I'm caching the dataframe before I write, what else can I do to troubleshoot?

Edit: It seems like it's trying to do all of my transformations at once. When I look at the details for the saveAsTable() method:

== Physical Plan ==
InMemoryTableScan [id#0L, label#90, features#119]
   +- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Filter (isnotnull(id#0L) && (id#0L < 21326835))
            +- InMemoryTableScan [id#0L, label#90, features#119], [isnotnull(id#0L), (id#0L < 21326835)]
                  +- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *Project [id#0L, label#90, pythonUDF0#135 AS features#119]
                           +- BatchEvalPython [<lambda>(collect_list_is#108, 56845.0)], [id#0L, label#90, collect_list_is#108, pythonUDF0#135]
                              +- SortAggregate(key=[id#0L, label#90], functions=[collect_list(indexedSegs#39, 0, 0)], output=[id#0L, label#90, collect_list_is#108])
                                 +- *Sort [id#0L ASC NULLS FIRST, label#90 ASC NULLS FIRST], false, 0
                                    +- Exchange hashpartitioning(id#0L, label#90, 200)
                                       +- *Project [id#0L, UDF(segment#2) AS indexedSegs#39, cast(label#1 as double) AS label#90]
                                          +- *BroadcastHashJoin [segment#2], [entry#12], LeftOuter, BuildRight
                                             :- HiveTableScan [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, reka_data_long_all_files
                                             +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
                                                +- *Project [cast(entry#7 as string) AS entry#12]
                                                   +- HiveTableScan [entry#7], MetastoreRelation reka_trop50, public_crafted_audiences_sized
Patrick McCarthy
  • 2,478
  • 2
  • 24
  • 40
  • Have you checked if your data is getting skewed during the many iterations that you are performing. Here one example where someone was facing similar situation - http://stackoverflow.com/questions/43081465/spark-container-executor-ooms-during-reducebykey – Pushkr Apr 09 '17 at 15:51
  • No, because I don't want to start performing the iterations until I can read the prepared data direct from disk at the start of the process, and so there haven't been any iterations yet. – Patrick McCarthy Apr 10 '17 at 13:32

2 Answers2

0

My suggestion would be to disable dynamic allocation. Try running it with the below configuration :

--master yarn-client --driver-memory 15g --executor-memory 15g --executor-cores 10 --num-executors 15 -Dspark.yarn.executor.memoryOverhead=20000 -Dspark.yarn.driver.memoryOverhead=20000 -Dspark.default.parallelism=500 
Sanchit Grover
  • 998
  • 1
  • 6
  • 9
  • No luck. Everything goes well until it tries to execute the saveAsTable() on one of the large dataframes. Produces: "17/04/11 03:36:14 ERROR cluster.YarnScheduler: Lost executor 4 on rs209.hadoop.pvt: Container killed by YARN for exceeding memory limits. 17.0 GB of 17 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. " – Patrick McCarthy Apr 11 '17 at 03:51
  • --master yarn-client --driver-memory 15g --executor-memory 15g --executor-cores 10 --num-executors 15 -Dspark.yarn.executor.memoryOverhead=100000 -Dspark.yarn.driver.memoryOverhead=20000 -Dspark.default.parallelism=500 – Sanchit Grover Apr 11 '17 at 05:29
  • No change. I also took care to cache the dataframes before writing, no difference. – Patrick McCarthy Apr 11 '17 at 14:14
0

Ultimately the clue I got from the Spark user mailing list was to look at the partitions, both balance and sizes. As the planner had it, too much was being given to a single executor instance. Adding .repartition(1000) to the expression creating the dataframe to be written made all the difference, and more gains could probably be achieved by creating and partitioning on a clever key column.

Patrick McCarthy
  • 2,478
  • 2
  • 24
  • 40