2

I'm struggling in making work this Spark SQL query which uses a simple join with a few logical conditions.

I can get the output with relatively small datasets, however, the situation changes with bigger datasets. I want to make this join works with datasets of over 14 million rows in A and 1 million rows in B.

I'm using an EMR cluster of 10 r4.4xlarge instances.

These ones are the following configuration parameters that I'm passing over to the job:

spark.driver.memory 100g
spark.executor.cores    5
spark.executor.memory   39g

And the parameters that use to create the SparkSession are these ones:

sq = SparkSession.builder.config('spark.rpc.message.maxSize', '1536')\
    .config("spark.sql.shuffle.partitions", 490)\
    .config("spark.sql.broadcastTimeout", 2000)\
    .config("spark.sql.autoBroadcastJoinThreshold", 1024*1024*900)\
    .getOrCreate()

The datasets A and B are obtained through the workflow but only in this query is where the process terminates.

sql_1 = """
        SELECT
          A.userid,
          A.eventtime,
          A.latitude as userid_latitude,
          A.longitude as userid_longitude,
          A.events,
          B.unique_reference_number,
          B.name as poi_name,
          B.pointx_classification_name as poi_classification_name,
          B.brand,
          B.lat as poi_latitude,
          B.long as poi_longitude,
          acos(sin(pi()*A.latitude/180.0)*sin(pi()*B.lat/180.0)+cos(pi()*A.latitude/180.0)*cos(pi()*B.lat/180.0)*cos(pi()*B.long/180.0-pi()*A.longitude/180.0))*6371 as distance,
          B.poi_radious_meters/1000 as poi_radious_km,
        CASE WHEN (acos(sin(pi()*A.latitude/180.0)*sin(pi()*B.lat/180.0)+cos(pi()*A.latitude/180.0)*cos(pi()*B.lat/180.0)*cos(pi()*B.long/180.0-pi()*A.longitude/180.0))*6371) <= B.poi_radious_meters/1000 THEN 1 ELSE 0 END as is_within_radius
        FROM A
        LEFT JOIN B ON array_contains(B.grid_array, A.grid_id)
        WHERE (acos(sin(pi()*A.latitude/180.0)*sin(pi()*B.lat/180.0)+cos(pi()*A.latitude/180.0)*cos(pi()*B.lat/180.0)*cos(pi()*B.long/180.0-pi()*A.longitude/180.0))*6371) <= 0.6
        """ 
interim = sq.sql(sql_1)

# Aggregate the events
output = interim.groupBy("userid", "eventtime", "unique_reference_number").agg((F.sum('events')).alias("events"))

This is the error that I'm getting:

: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:215)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:123)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:235)
    at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:124)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:42)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:368)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryRelation.scala:96)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryRelation.scala:85)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryRelation.scala:41)
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:98)
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
    at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
    at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [2000 seconds]

without using any broadcast configuration in the Spark Session builder I get this error:

org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:215)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:123)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:235)
    at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:124)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:42)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:368)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryRelation.scala:96)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryRelation.scala:85)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryRelation.scala:41)
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:98)
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
    at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
    at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job 14 cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:809)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:807)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:807)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1738)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1657)
    at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1826)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1283)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1825)
    at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:1770)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:629)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:78)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:75)
    at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:94)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:74)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:74)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Do you know what's the best way to tune spark and make this query works with large datasets?

ultraInstinct
  • 4,063
  • 10
  • 36
  • 53
  • Why did you use `spark.sql.autoBroadcastJoinThreshold`? Looks like the data broadcast is too big and the network not so fast and it times out --> "BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:131)" – Jacek Laskowski Oct 10 '17 at 10:58
  • Previously, I didn't have it but the same error was coming up. I did a quick research about it and then I decided to add it to see if the situation could change but sadly it did not.: – ultraInstinct Oct 10 '17 at 11:03
  • Could you remove all the other "things" you think did not help and start over. I'm curious what can cause it and could help (but without being told that's not needed or similar). – Jacek Laskowski Oct 10 '17 at 11:05
  • Ok - I will run it again without those things. I'll let you know in a few minutes – ultraInstinct Oct 10 '17 at 11:07
  • I added the log error in the question when I don't use the broadcast parameters – ultraInstinct Oct 10 '17 at 11:14
  • have a look to my answer to a similar question : https://stackoverflow.com/questions/40474057/what-are-possible-reasons-for-receiving-timeoutexception-futures-timed-out-afte/48449467#48449467. It may help – mathieu Jan 25 '18 at 18:28

0 Answers0