18

I have built few Spark Structured Streaming queries to run on EMR, they are long running queries, and need to run at all times, since they are all ETL type queries, when I submit a job to YARN cluster on EMR, I can submit a single spark application. So that spark application should have multiple streaming queries.

I am confused on how to build/start multiple streaming queries within same submit programmatically.

For ex: I have this code:

case class SparkJobs(prop: Properties) extends Serializable {
  def run() = {
      Type1SparkJobBuilder(prop).build().awaitTermination()
      Type1SparkJobBuilder(prop).build().awaitTermination()
  }
}

I fire this in my main class with SparkJobs(new Properties()).run()

When I see in the spark history server, only the first spark streaming job (Type1SparkJob) is running.

What is the recommended way to fire multiple streaming queries within same spark submit programatically, I could not find proper documentation either.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
N_C
  • 952
  • 8
  • 17

2 Answers2

39

Since you're calling awaitTermination on the first query it's going to block until it completes before starting the second query. So you want to kick off both queries, but then use StreamingQueryManager.awaitAnyTermination.

val query1 = df.writeStream.start()
val query2 = df.writeStream.start()

spark.streams.awaitAnyTermination()

In addition to the above, by default Spark uses the FIFO scheduler. Which means the first query gets all resources in the cluster while it's executing. Since you're trying to run multiple queries concurrently you should switch to the FAIR scheduler

If you have some queries that should have more resources than the others then you can also tune the individual scheduler pools.

Silvio
  • 3,947
  • 21
  • 22
  • Thanks for the detailed answer, I found an example of `awaitAnyTermination` [here](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-demo-StreamingQueryManager-awaitAnyTermination-resetTerminated.html) and tried it in `spark-shell`, though I dont have spark-defaults.conf file, both the jobs in the example started, does it mean that the default scheduler is FAIR for spark-shell or spark in general? – N_C Oct 15 '18 at 17:22
  • It just depends on the number of cores you have available. For example, query1 needs 10 cores, query2 needs 5. If you have a cluster with 20 cores there's no problem. – Silvio Oct 15 '18 at 22:57
  • Well, multiple queries vs single query for one Spark Application? Which one is the make sense? – lifeisshort May 03 '19 at 06:56
-5

val query1=ds.writeSteam.{...}.start()

val query2=ds.writeSteam.{...}.start()

val query3=ds.writeSteam.{...}.start()

query3.awaitTermination()

AwaitTermination() will block your process until finish, which will never happen in a streaming app, call it on your last query should fix your problem

dunlu_98k
  • 209
  • 2
  • 3
  • 11