0

I'm trying to join two Dataframes, one is around 10 million records and the other is about 1/3 of that. Since the small DataFrame fits comfortably in the executor's memory, I perform a broadcast join and then write out the result:

val df = spark.read.parquet("/plablo/data/tweets10M")
  .select("id", "content", "lat", "lon", "date")
val fullResult = FilterAndClean.performFilter(df, spark)
  .select("id", "final_tokens")
  .filter(size($"final_tokens") > 1)
val fullDFWithClean = {
  df.join(broadcast(fullResult), "id")
}
fullDFWithClean
    .write
    .partitionBy("date")
    .mode(saveMode = SaveMode.Overwrite)
    .parquet("/plablo/data/cleanTokensSpanish")

After a while, I get this error:

org.apache.spark.SparkException: Exception thrown in awaitResult: 
at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:215)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:125)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:123)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:36)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:68)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:88)
at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:209)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.FileSourceScanExec.consume(DataSourceScanExec.scala:141)
at org.apache.spark.sql.execution.FileSourceScanExec.doProduceVectorized(DataSourceScanExec.scala:392)
at org.apache.spark.sql.execution.FileSourceScanExec.doProduce(DataSourceScanExec.scala:315)
.....

There's this question that addresses the same issue. In the comments, it's mentioned that increasing spark.sql.broadcastTimeout could fix the problem, but after setting a large value (5000 seconds) I still get the same error (although much later, of course).

The original data is partitioned by date column, the function that returns fullResult performs a series of narrow transformations and filters the data so, I'm assuming, the partition is preserved.

The Physical Plan confirms that spark will perform a BroadcastHashJoin

*Project [id#11, content#8, lat#5, lon#6, date#150, final_tokens#339]
+- *BroadcastHashJoin [id#11], [id#363], Inner, BuildRight
:- *Project [id#11, content#8, lat#5, lon#6, date#150]
:  +- *Filter isnotnull(id#11)
:     +- *FileScan parquet [lat#5,lon#6,content#8,id#11,date#150] 
Batched: true, Format: Parquet, Location: 
InMemoryFileIndex[hdfs://geoint1.lan:8020/plablo/data/tweets10M], 
PartitionCount: 182, PartitionFilters: [], PushedFilters: 
[IsNotNull(id)], ReadSchema: 
struct<lat:double,lon:double,content:string,id:int>
   +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
      +- *Project [id#363, UDF(UDF(UDF(content#360))) AS 
 final_tokens#339]
     +- *Filter (((UDF(UDF(content#360)) = es) && (size(UDF(UDF(UDF(content#360)))) > 1)) && isnotnull(id#363))
        +- *FileScan parquet [content#360,id#363,date#502] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://geoint1.lan:8020/plablo/data/tweets10M], PartitionCount: 182, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<content:string,id:int>

I believe that, given the size of my data, this operation should be relatively fast (on 4 executors with 5 cores each and 4g RAM running on YARN in cluster mode).

Any help is appreciated

Muhammad Dyas Yaskur
  • 6,914
  • 10
  • 48
  • 73
plablo09
  • 353
  • 2
  • 4
  • 12
  • Do you think the case that some partitions of your first dataset is to large so the result of it and the broadcast dataset is to big ??? You can checkout the size of each partition and the distribution of `id` value to verify it. – Thang Nguyen Nov 21 '17 at 01:21
  • I don't think that's an issue, id is a unique row identifier so it should be evenly distributed – plablo09 Nov 21 '17 at 14:47

2 Answers2

2

In situations like this, the first question is how big is the dataframe you are trying to broadcast? It's worth estimating its size (see this SO answer and this also).

Note that Spark's default spark.sql.autoBroadcastJoinThreshold is only 10Mb so you are really not supposed to broadcast very large datasets.

Your use of broadcast takes precedence and may be forcing Spark to do something it otherwise would choose not to do. A good rule is to only force aggressive optimization if the default behavior is unacceptable because aggressive optimization often creates various edge conditions, like the one you are experiencing.

Sim
  • 13,147
  • 9
  • 66
  • 95
  • I'm taking your advice, but at the same time, I'm partitioning both DataFrames by id. In this scenario, the default behavior is to perform a SortMergeJoin which, if I can get the partitions right should be fast enough. I'll report back – plablo09 Nov 21 '17 at 19:31
  • That might help but, keep in mind that partitioning by `id` will already force a shuffle for both datasets so I'm not entirely sure what you are saving since you attempted to force a broadcast in order to avoid a shuffle in the first place... – Sim Nov 21 '17 at 23:21
  • How did you wind up resolving this issue? – CubemonkeyNYC Dec 04 '18 at 01:38
  • Hi I spotted the same error message in my glue job logs, but I still can't fix it, here's my question: https://stackoverflow.com/questions/66135124/glue-job-succeeded-without-stepfunction-but-failed-if-its-kicked-off-by-stepfu, could you please take a look please? – wawawa Feb 10 '21 at 14:10
0

This can also fail if spark.task.maxDirectResultSize is not increased. It's default is 1 megabyte (1m). Try spark.task.maxDirectResultSize=10g.

pourhaus
  • 566
  • 6
  • 9
  • spark.task.maxDirectResultSize=10g shows invalid syntax. – muglikar Sep 08 '20 at 18:18
  • Hi I spotted the same error message in my glue job logs, but I still can't fix it, here's my question: https://stackoverflow.com/questions/66135124/glue-job-succeeded-without-stepfunction-but-failed-if-its-kicked-off-by-stepfu, could you please take a look please? – wawawa Feb 10 '21 at 14:10