1

We have a weird situation when processing the following steps using Pyspark 1.6.2 via Jupyter notebook on a Hortonworks cluster:

  1. Reading data from ORC table in a pyspark dataframe
  2. Pivoting this table by pivot_column (pivoted_df)
  3. Adding some calculated columns on a specific selection of the pivoted_df: calced_df = pivoted_df.select(dependency_list).run_calculations()
  4. Inner Join on the big table pivoted_df (columns > 1.600) and the "small" table calced_df (only ~270 columns) to union all the columns
  5. Saving to Parquet table

(In Step 3 the selection is necessary because otherwise it would take a long time to add some calculated fields with withColumn statement. Select + Join is faster than withColumn on a table with a lot of columns)

However, for a dataset, where pivot_column has less than 2.500 variations, the job is working fine. For example, we processed the job successfully starting with 75.430.000 rows and 1.600 columns. When we process the job with another dataset, containing less rows (50.000) and more columns (2.433), it is working, too.

But finally the job crashes in the last step, since pivot_column has more than 2500 variations (and only around 70.000 rows) with a Stackoverflow error. We debugged the single steps with some show() actions, to examine where exactly the job is failing. We found out, that everything works fine, up to the Join in Step 4. So the join is causing the problems and since this step we're getting the following message:

Py4JJavaError: An error occurred while calling o4445.showString.
: java.lang.StackOverflowError
    at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
    at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:22)
    at scala.collection.TraversableLike$class.to(TraversableLike.scala:629)
    at scala.collection.AbstractTraversable.to(Traversable.scala:105)
    at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:267)
    at scala.collection.AbstractTraversable.toSet(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:280)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)

But what exactly is causing this error and how can we avoid it?

Our current Spark configurations:

.setMaster("yarn-client") \
.set("spark.executor.instances","10") \
.set("spark.executor.cores","4") \
.set("spark.executor.memory","10g") \
.set("spark.driver.memory","8g") \
.set("spark.yarn.executor.memoryOverhead","1200") \
.set("spark.sql.pivotMaxValues", "6000") \
.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")

Many thanks in advance!

MelD
  • 31
  • 6
  • 2
    Spark is great for tall tables (lots of rows) but it does not scale well for wide tables (lots of columns). You may be running into the limits of what your system can handle- see [this comment](https://stackoverflow.com/questions/39373322/spark-dataframe-maximum-column-count#comment66083545_39375863). – pault Aug 06 '18 at 13:32
  • 1
    More discussion in the comments of [this question](https://stackoverflow.com/questions/44557739/maximum-number-of-columns-we-can-have-in-dataframe-spark-scala). In practice, what I've seen done in these cases is to break up the wide table into a number of thinner tables. Then when you need to use it, do a join using a primary key. – pault Aug 06 '18 at 14:15

1 Answers1

0

The executors hold the data that is saved to parquet as row format (in spark 1.6 - changed in 2.x see https://spoddutur.github.io/spark-notes/deep_dive_into_storage_formats.html) before compressing and saving it - you can either increase the number of partitions (spark.sql.shuffle.partitions) or the memory for the executors

Note that python takes some of the memory for the spark executor (a spark.python.worker.memory setting controls that)

Another thing you can help is sort the data before saving to parquet

Arnon Rotem-Gal-Oz
  • 25,469
  • 3
  • 45
  • 68
  • We already tried **sorting** the data before saving it to Parquet, unfortunately that didn't solve the issue (And we already tried saving the data in ORC format)... Now we first tried to increase the number of **spark.sql.shuffle.paritions** to 400, then to 600, but that didn't make any difference. Then we tried it with more **exectuor.memory** (20g) (and 400 partitions) - Same error again. We also have implemented some cache() steps in the job (for example after the pivot step) and found out that when we do a show() after the cache(), this leads to the same error... – MelD Aug 06 '18 at 13:24
  • we're pivoting with smaller number of columns (~500) but at our data size we had to go with 2000-4000 partitions to stave off OOMs – Arnon Rotem-Gal-Oz Aug 06 '18 at 14:22