3

I get an exception when doing what seems to be simple spark sql filtering job:

    someOtherDF
      .filter(/*somecondition*/)
      .select($"eventId")
      .createOrReplaceTempView("myTempTable")

    records
      .filter(s"eventId NOT IN (SELECT eventId FROM myTempTable)")

Any idea how I can solve this ?

Note:

  • someOtherDF contains between ~1M and 5M rows after filtering and eventId are guids.
  • records contains between 40M and 50M rows.

Error:

Stacktrace:

org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:215)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:123)
        at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:212)
    ... 84 more
Michel Lemay
  • 2,054
  • 2
  • 17
  • 34
  • we have a similar issue of "futures timed out" (SPARK-20784), but with a very different query pattern. What type of deployment are you using ? YARN-client ? other ? Are the involved datasets cached ? – mathieu May 18 '17 at 10:02
  • In this particular scenario, datasets were not cached since it is single pass transformation. Theses tests were running on yarn; client or cluster mode didn't matter if I recall correctly. – Michel Lemay May 18 '17 at 13:38
  • I finally found the cause in my case, it was due to a OOM on the driver (which was most of the time silent, just saw the exception by chance in the flow of logs) – mathieu May 22 '17 at 13:30

1 Answers1

1

Using pieces from: 1) How to exclude rows that don't join with another table? 2) Spark Duplicate columns in dataframe after join

I can solve my problem using a left outer join like this:

    val leftColKey = records("eventId")
    val rightColKey = someOtherDF("eventId")
    val toAppend: DataFrame = records
      .join(someOtherDF, leftColKey === rightColKey, "left_outer")
      .filter(rightColKey.isNull) // Keep rows without a match in 'someOtherDF'. See (1)
      .drop(rightColKey) // Needed to discard duplicate column. See (2)

Performance is really good and it does not suffer from the 'Future timed out' problem.

EDIT

As a colleague pointed out to me, the "leftanti" join type is more efficient.

Community
  • 1
  • 1
Michel Lemay
  • 2,054
  • 2
  • 17
  • 34