0

Following is how my code sequenced,

//Accumulator initialized 
val count = new LongAccumulator
sparksession.sparkContext.register(count,"count accumulator")

// Streaming Transformation
val DF = fromKafkaDF.map{
  count.add(1)
  println(count.value)  // This value is one
  //some transformation
}.writeStream.outputMode("update").format("console").start()

//trying to access the value of accumulator from driver
println(count.value)  //this value is zero

Why the value of accumulator is zero in driver? I have other logic to work on based on this accumulator. Please suggest.

Shaido
  • 27,497
  • 23
  • 70
  • 73
prady
  • 563
  • 4
  • 9
  • 24

2 Answers2

0

For you to accumulate any value in counter, you should perform an action and check, accumulator wont be in use without any action, please check the link accumulator explained

Harshit Saklecha
  • 100
  • 1
  • 10
0

use it in listeners like,

class TestListner(acc: LongAccumulator) extends StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    println("onQueryStarted   :" + event.toString)
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    println(acc)
    acc.reset()
    println("onQueryProgress    :" + event.progress)
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    println("onQueryProgress    :" + event)
  }
}

and addListener in your main application,

    spark.streams.addListener(new TestListner(acc))
Alen Peter
  • 126
  • 4
  • 10