This is a pretty common spark related question regarding handling situation which piece of code is executed on which park of spark (executor/driver). Having this piece of code I am a bit surprised why I do not get values I am expecting:
1 stream
2 .foreachRDD((kafkaRdd: RDD[ConsumerRecord[String, String]]) => {
3 val offsetRanges = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges
4 import argonaut.Argonaut.StringToParseWrap
5
6 val rdd: RDD[SimpleData] = kafkaRdd.mapPartitions((records: Iterator[ConsumerRecord[String, String]]) => {
7 val invalidCount: AtomicLong = new AtomicLong(0)
8 val convertedData: Iterator[SimpleData] = records.map(record => {
9 val maybeData: Option[SimpleData] = record.value().decodeOption[SimpleData]
10 if (maybeData.isEmpty) {
11 logger.error("Cannot parse data from kafka: " + record.value())
12 invalidCount.incrementAndGet()
13 }
14 maybeData
15 })
16 .filter(_.isDefined)
17 .map(_.get)
18
19 val statsDClient = new NonBlockingStatsDClient("appName", "monitoring.host", 8125) // I know it should be a singleton :)
20 statsDClient.gauge("invalid-input-records", invalidCount.get())
21
22 convertedData
23 })
24
25 rdd.collect().length
26 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
27 })
Idea: getting JSON data from kafka report number entries that have invalid format (if any). I assume that when I am using mapPartitions method code inside will be executed for each partition I have. I.e. I would expect that lines 7-22 will be wrapped/closure-d and sent to executor for execution. In this case I was expecting that
invalidData
variable will be in scope of execution on executor and will be updated if there is an error happened during json->object conversion (lines 10-13). Because internally there is no notion of RDD or something - there is only regular scala iterator over regular entries. In lines 19-20 statsd client sends to metric server invalidData value. Apparently I am always getting '0' as a result.
However if I change code to this:
1 stream
2 .foreachRDD((kafkaRdd: RDD[ConsumerRecord[String, String]]) => {
3 val offsetRanges = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges
4
5 // this is ugly we have to repeat it - but argonaut is NOT serializable...
6 val rdd: RDD[SimpleData] = kafkaRdd.mapPartitions((records: Iterator[ConsumerRecord[String, String]]) => {
7 import argonaut.Argonaut.StringToParseWrap
8 val convertedDataTest: Iterator[(Option[SimpleData], String)] = records.map(record => {
9 val maybeData: Option[SimpleData] = record.value().decodeOption[SimpleData]
10 (maybeData, record.value())
11 })
12
13 val testInvalidDataEntries: Int = convertedDataTest.count(record => {
14 val empty = record._1.isEmpty
15 if (empty) {
16 logger.error("Cannot parse data from kafka: " + record._2)
17 }
18 empty
19 })
20 val statsDClient = new NonBlockingStatsDClient("appName", "monitoring.host", 8125) // I know it should be a singleton :)
21 statsDClient.gauge("invalid-input-records", testInvalidDataEntries)
22
23 convertedDataTest
24 .filter(maybeData => maybeData._1.isDefined)
25 .map(data => data._1.get)
26 })
27
28 rdd.collect().length
29 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
30 })
It works as expected. I.e. if I count invalid entries implicitly I am getting expecting value.
Not sure I am getting why. Ideas?
Code to play with can be found at github