-1

I am not able to update accumulator value inside dataframe.map function. PFB code for the same.

case class TestPerson(name: String, age: Long, salary: Double)

val tom = TestPerson("Tom Hanks",37,35.5)
val sam = TestPerson("Sam Smith",40,40.5)
val stev = TestPerson("Stev Smith",45,30.5)

val PersonList = scala.collection.mutable.MutableList[TestPerson]()

PersonList += tom
PersonList += sam
PersonList += stev

val personDF = PersonList.toDF()

class ListAccumulatorParam[B] extends AccumulatorParam[List[Row]] {

  def zero(initialValue: List[Row]): List[Row] = {
    List.empty
  }

  def addInPlace(l1: List[Row],l2: List[Row]): List[Row] = {
    l1 ::: l2
  }  
}

var listAccum = sc.accumulator(List[Row]())(new ListAccumulatorParam[Row]())
personDF.map { row => listAccum += List(row)}

listAccum is getting blank.

But at a same time if I do parallelize and then check the value value is got updated in accumulator. sc.parallelize(personDF.collect()).foreach(row => listAccum += List(row))

Actual use case is I wanted to do perform some more action on the same row.. and if that action fails then I want that set of rows back... that's the reason I want those Rows in Accumulator.

Am I doing something in wrong way, because of that listAccum is getting blank??

Kalpesh
  • 694
  • 2
  • 8
  • 28

1 Answers1

2

I was not running any action because of that it was not giving any value.

Found at When are accumulators truly reliable?

Community
  • 1
  • 1
Kalpesh
  • 694
  • 2
  • 8
  • 28