1

I am working with spark streaming and I am facing some issues trying to implement multiple writestreams. Below is my code

DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)

where writeStreamer is defined as follows :

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {

  val query = input
                .writeStream
                .format("orc")
                .option("checkpointLocation", checkPointFolder)
                .option("path", output)
                .outputMode(OutputMode.Append)
                .start()

  query.awaitTermination()
}

the problem I am facing is that only the first table is written with spark writeStream , nothing happens for all other tables . Do you have any idea about this please ?

zero323
  • 322,348
  • 103
  • 959
  • 935
scalacode
  • 1,096
  • 1
  • 16
  • 38

3 Answers3

5

query.awaitTermination() should be done after the last stream is created.

writeStreamer function can be modified to return a StreamingQuery and not awaitTermination at that point (as it is blocking):

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
  input
    .writeStream
    .format("orc")
    .option("checkpointLocation", checkPointFolder)
    .option("path", output)
    .outputMode(OutputMode.Append)
    .start()
}

then you will have:

val query1 = DataWriter.writeStreamer(...)
val query2 = DataWriter.writeStreamer(...)
val query3 = DataWriter.writeStreamer(...)

query3.awaitTermination()
bp2010
  • 2,342
  • 17
  • 34
1

If you want to execute writers to run in parallel you can use

sparkSession.streams.awaitAnyTermination()

and remove query.awaitTermination() from writeStreamer method

Shreyansh
  • 11
  • 1
0

By default the number of concurrent jobs is 1 which means at a time only 1 job will be active

did you try increase number of possible concurent job in spark conf ?

sparkConf.set("spark.streaming.concurrentJobs","3")

not a offcial source : http://why-not-learn-something.blogspot.com/2016/06/spark-streaming-performance-tuning-on.html

maxime G
  • 1,660
  • 1
  • 10
  • 27
  • I removed the await termination from the writer , and now I am getting a new error : ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter: Aborting job null. org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down – scalacode Jul 18 '18 at 12:57