76

I am using Spark 1.5.

I have two dataframes of the form:

scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]

scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]

libriFirstTable50Plus3DF has 766,151 records while linkPersonItemLessThan500DF has 26,694,353 records. Note that I am using repartition(number) on linkPersonItemLessThan500DF since I intend to join these two later on. I am following up the above code with:

val userTripletRankDF = linkPersonItemLessThan500DF
     .join(libriFirstTable50Plus3DF, Seq("family_id"))
     .take(20)
     .foreach(println(_))

for which I am getting this output:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:        at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
 at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
 at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
 at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
 at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
 at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
 at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
 at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
 at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87)
 at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
 at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91)
 at $iwC$$iwC$$iwC.<init>(<console>:93)
 at $iwC$$iwC.<init>(<console>:95)
 at $iwC.<init>(<console>:97)
 at <init>(<console>:99)
 at .<init>(<console>:103)
 at .<clinit>(<console>)
 at .<init>(<console>:7)
 at .<clinit>(<console>)
 at $print(<console>)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
 at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
 at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
 at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
 at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
 at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

and I don't understand what is the issue. Is it as simple as increasing the waiting time? Is the join too intensive? Do I need more memory? Is the shufffling intensive? Can anyone help?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Christos Hadjinikolis
  • 2,099
  • 3
  • 20
  • 46
  • let me mention my answer to a very similar question : https://stackoverflow.com/a/48449467/418293 – mathieu Jan 25 '18 at 18:22

4 Answers4

141

This happens because Spark tries to do Broadcast Hash Join and one of the DataFrames is very large, so sending it consumes much time.

You can:

  1. Set higher spark.sql.broadcastTimeout to increase timeout - spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
  2. persist() both DataFrames, then Spark will use Shuffle Join - reference from here

PySpark

In PySpark, you can set the config when you build the spark context in the following manner:

spark = SparkSession
  .builder
  .appName("Your App")
  .config("spark.sql.broadcastTimeout", "36000")
  .getOrCreate()
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
39

Just to add some code context to the very concise answer from @T. Gawęda.


In your Spark application, Spark SQL did choose a broadcast hash join for the join because "libriFirstTable50Plus3DF has 766,151 records" which happened to be less than the so-called broadcast threshold (defaults to 10MB).

You can control the broadcast threshold using spark.sql.autoBroadcastJoinThreshold configuration property.

spark.sql.autoBroadcastJoinThreshold Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run.

You can find that particular type of join in the stack trace:

org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)

BroadcastHashJoin physical operator in Spark SQL uses a broadcast variable to distribute the smaller dataset to Spark executors (rather than shipping a copy of it with every task).

If you used explain to review the physical query plan you'd notice the query uses BroadcastExchangeExec physical operator. This is where you can see the underlying machinery for broadcasting the smaller table (and the timeout).

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
  ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}

doExecuteBroadcast is part of SparkPlan contract that every physical operator in Spark SQL follows that allows for broadcasting if needed. BroadcastExchangeExec happens to need it.

The timeout parameter is what you are looking for.

private val timeout: Duration = {
  val timeoutValue = sqlContext.conf.broadcastTimeout
  if (timeoutValue < 0) {
    Duration.Inf
  } else {
    timeoutValue.seconds
  }
}

As you can see you can disable it completely (using a negative value) that would imply to wait for the broadcast variable to be shipped to executors indefinitely or use sqlContext.conf.broadcastTimeout which is exactly spark.sql.broadcastTimeout configuration property. The default value is 5 * 60 seconds which you can see in the stacktrace:

java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • 2
    There may be different reasons why this timeout occurs. One such reason is lack of resources to run the Executor(s) on the cluster. spark.scheduler.minRegisteredResourcesRatio and spark.scheduler.maxRegisteredResourcesWaitingTime can be used make the execution wait until resources are available. – Eugene Feb 14 '20 at 00:24
  • 1
    @Eugene i find myself in a case where i am not making joins and can't find any join in my stack trace so i am inclined to belive that your comment is true. How can i find out that lack of resources is my problem? – frammnm Mar 10 '20 at 11:29
  • @frammnm I got curious about your case. Could you ask a separate question and include the execution plan? Thanks! – Jacek Laskowski Mar 11 '20 at 07:32
  • 1
    @JacekLaskowski hello, i have been working with eventhubs and i noticed that when you don't send messages to eventhub for the spark streaming for sometime this exception regarding futures times out occurs, i think it has to do with the library that handles the eventhub having a timeout exception with its promise handling. I hope this helps. – frammnm Mar 27 '20 at 10:33
  • @JacekLaskowski Thanks for the answer! `persist` avoids recomputing the whole lineage, but why it helps to reduce the time of broadcasting so that we don't have `broadcast timeout`? – jack Nov 04 '20 at 20:01
8

In addition to increasing spark.sql.broadcastTimeout or persist() both DataFrames,

You may try:

1.disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1

2.increase the spark driver memory by setting spark.driver.memory to a higher value.

lasclocker
  • 311
  • 3
  • 8
  • 3
    If the error is due to timeout, why `spark.driver.memory` could help? – jack Nov 04 '20 at 19:57
  • 1
    @jack broadcasting means that the entire broadcasted dataset has to be collected in the driver before being sent out to workers. If your driver doesn't have enough memory, then things can go badly. – Logister Nov 22 '21 at 20:59
0

In my case, it was caused by a broadcast over a large dataframe:

df.join(broadcast(largeDF))

So, based on the previous answers, I fixed it by removing the broadcast:

df.join(largeDF)
pedromorfeu
  • 1,739
  • 14
  • 18