-1

I have a bunch of messages in kafka and using spark streaming to process those messages.

I am trying to catch when my code fails to insert to my DB, and then take those messages and insert them back into Kafka so I can process them later.

To combat this, I create a variable inside my foreachRDD function called "success". Then when I attempt to update to the DB I return a boolean for the successful insert. What I've noticed during testing is that this doesn't seem to work well when I try to insert during my foreachPartition. It seems like the success value gets "reset" when I get outside of the foreachPartition function.

stream: DStream[String]

stream
  .foreachRDD(rdd => {
    if (!rdd.isEmpty()) {
      var success = true
      rdd.foreachPartition(partitionOfRecords => {
        if (partitionOfRecords.nonEmpty) {
          val listOfRecords = partitionOfRecords.toList
          val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
          logger.info("Insert was successful: " + successfulInsert)
          if (!successfulInsert) {
            logger.info("logging successful as false. Currently its set to: " + success )
            success = false
            logger.info("logged successful as false. Currently its set to: " + success )

          }
        }
      })

      logger.info("Insert into database successful from all partition: " + success)
      if (!success) {
        // send data to Kafka topic
      }

    }
  })

The output from my logs then shows this!

2019-06-24 20:26:37 [INFO] Insert was successful: false 2019-06-24 20:26:37 [INFO] logging successful as false. Currently its set to: true 2019-06-24 20:26:37 [INFO] logged successful as false. Currently its set to: false 2019-06-24 20:26:37 [INFO] Insert into database successful from all partition: true

Even though in the 3rd log it says that currently "success" is set to false, then when I get outside of foreachPartition, I log it again and it's set back to true.

Can anyone explain why? Or suggest a different approach?

alex
  • 1,905
  • 26
  • 51
  • Possible duplicate of [Scala spark, listbuffer is empty](https://stackoverflow.com/questions/40699432/scala-spark-listbuffer-is-empty) – user10938362 Jun 25 '19 at 07:01
  • 1
    Thanks for listing as a duplicate. I see it's a duplicate however no way I would have found that question. The question title "Scala spark, listbuffer is empty" is nonindicative at all of the shared memory techniques in Spark. Therefore until that title is changed for that question I will not accept this as being a duplicate since the title of my question will likely lead others with the same problem to this question which I've posted an answer for accumulators for. – alex Jun 25 '19 at 12:43

1 Answers1

0

I was able to get this to work using an accumulator.

stream: DStream[String]

val dbInsertACC = sparkSession.sparkContext.longAccumulator("insertSuccess")

stream
  .foreachRDD(rdd => {
    if (!rdd.isEmpty()) {
      //could maybe put accumulator here?
      rdd.foreachPartition(partitionOfRecords => {
        if (partitionOfRecords.nonEmpty) {
          val listOfRecords = partitionOfRecords.toList
          val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
          logger.info("Insert was successful: " + successfulInsert)
          if (!successfulInsert) dbInsertACC.add(1)
        }
      })

      logger.info("Insert into database successful from all partition: " + success)
      if (!dbInsertACC.isZero) {
        // send data to Kafka topic
      }

    }
  })
alex
  • 1,905
  • 26
  • 51