I have a bunch of messages in kafka and using spark streaming to process those messages.
I am trying to catch when my code fails to insert to my DB, and then take those messages and insert them back into Kafka so I can process them later.
To combat this, I create a variable inside my foreachRDD function called "success". Then when I attempt to update to the DB I return a boolean for the successful insert. What I've noticed during testing is that this doesn't seem to work well when I try to insert during my foreachPartition. It seems like the success value gets "reset" when I get outside of the foreachPartition function.
stream: DStream[String]
stream
.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
var success = true
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.nonEmpty) {
val listOfRecords = partitionOfRecords.toList
val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
logger.info("Insert was successful: " + successfulInsert)
if (!successfulInsert) {
logger.info("logging successful as false. Currently its set to: " + success )
success = false
logger.info("logged successful as false. Currently its set to: " + success )
}
}
})
logger.info("Insert into database successful from all partition: " + success)
if (!success) {
// send data to Kafka topic
}
}
})
The output from my logs then shows this!
2019-06-24 20:26:37 [INFO] Insert was successful: false 2019-06-24 20:26:37 [INFO] logging successful as false. Currently its set to: true 2019-06-24 20:26:37 [INFO] logged successful as false. Currently its set to: false 2019-06-24 20:26:37 [INFO] Insert into database successful from all partition: true
Even though in the 3rd log it says that currently "success" is set to false, then when I get outside of foreachPartition, I log it again and it's set back to true.
Can anyone explain why? Or suggest a different approach?