1

Hey Guys I am working with Kafka + Spark Streaming with Play Json Library. I need to flatten the Incoming Complex Kafka message (json string).

Sample Incoming Kafka Message

"""{"name" : "somename" , age : 20 , "address" : "{"door_no" : "20A" , "street" : "new_street"}","ip":{"local":"128.7.0.1"}}"""

If you see above kafka message there is Json of Json with address value is String as Json. So I wrote a recursive function that will flatten the incoming message, Thats working perfectly.

My Actual Code

import scala.collection.Map
import play.api.libs.json._
object Cbr_Network_Connection {
def main(args: Array[String]): Unit = {
def json_flattensizer(JsObject):Map[String,String] = {
    var map = Map[String,Any]
  //Recursive Function that Generated Flatten Map using Incoming Json Message   
    return map
  }
    val kafkaStream = KafkaUtils.createDirectStream(sparkStreamingContext, PreferConsistent, Subscribe[String, String]("topic_name", kafkaConsumerParams))
    kafkaStream.foreachRDD(foreachFunc = rdd => {
      if (!rdd.isEmpty()) {
        import ss.implicits._
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        val newRDD = rdd.map(consumerRecord => consumerRecord.value())
        val df = ss.read.option("mode", "DROPMALFORMED").json(ss.createDataset(newRDD)(Encoders.STRING))
        val Enricheddf = df.toJSON.map(row => {
        val in_json = .parse(record)
        var map = json_flattensizer(in_json) // Java.io.Serialization error Coming here
        Row(map("name"),map("age"),map("address_door_no"),map("ip_local"))
        })(encoder)
        Enricheddf.toJSON.foreach(record => {
          kafka_producer.send("topic_name", record) //Sending Flatten data to another Kafka topic
        })
        kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    })
    sparkStreamingContext.start()
    sparkStreamingContext.awaitTermination()
}}

But while running above code I am getting Serialization Error

java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData is being serialized  possibly as a part of closure of an RDD operation. This is because  the DStream object is being referred to from within the closure.  Please rewrite the RDD operation inside this DStream to avoid this.  This has been enforced to avoid bloating of Spark tasks  with unnecessary objects.
        at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:137)
        at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124)
        at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1255)
        at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:124)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1155)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializableWithWriteObjectMethod(SerializationDebugger.scala:230)

Any Ways to Serialize my recursive Scala function that (using Play Json Library)?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Ajay007
  • 11
  • 1
  • You don't need Play JSON. Just use Structured Streaming builtin JSON support. You can use a UDF to replace your flatten code – OneCricketeer May 09 '22 at 05:15
  • Thanks for your Suggestion. but after Flatten I need to store Consumed offset into HDFS. suppose Job fails I need to consume from HDFS stored offset. So I think It cant be done in Structed Streaming. – Ajay007 May 09 '22 at 17:05
  • Why HDFS? Kafka stores offsets on its own for Structured Streaming – OneCricketeer May 09 '22 at 17:52
  • If Possible can you share any documentations, it really helpful. Is possible to start Structured streaming consumer Kafka from failed consumer offset location. ? – Ajay007 May 10 '22 at 04:28
  • That's just default behavior of any Kafka consumer group. I'm not sure what you mean by "failed", though. If you restart a consumer in the same group after a previous offset commit, it'll restart at that offset, yes. For Spark specifically, you can set checkpoints to be stored in HDFS https://stackoverflow.com/questions/64003405/how-to-use-kafka-group-id-and-checkpoints-in-spark-3-0-structured-streaming-to-c/64003569#64003569 – OneCricketeer May 10 '22 at 15:00
  • I founf one thing in my code. Whenever I try to commit offset with Kafka its giving the issue. `foreachrdd(rdd=>{ var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val m = jsonparser(somedata) kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) // This line Shows Task Not serializable }` Can you please help me on this ? – Ajay007 May 10 '22 at 16:50
  • `kafkaStream` is defined outside of foreachRDD, so it isn't serializable. But maybe you can try casting `kafkaStream.inputDStream` to `CanCommitOffsets` instead. Once again, Spark Structured Streaming checkpoints can already be stored in HDFS, so I think rewriting the code would be better. – OneCricketeer May 10 '22 at 17:03
  • Thanks you Help. I agree with your Idea (Structured Streaming). But client requirement is using KafkaStream/Spark Stream only. Also In last comment you said try to cast to CanCommitOffset. could you please explain briefly. **I am trying to find solution for past few weeks** Can you help me to resolve Kafka OffSet commit Issue. Please lookinto my code .. – Ajay007 May 11 '22 at 11:32
  • I don't think you're showing the complete stacktrace but 1) You're not committing offsets to HDFS here like you said you needed to 2) Look at the Java example rather than the Scala one. It uses `inputDStream()` before casting https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself . – OneCricketeer May 11 '22 at 13:21
  • I am not started to store Offset in HDFS. for now I am trying to commit Offset to kafka consumer. because I have turned **"auto.commit.offset"** config to **false** (client requirement). So Commiting offset showing Not serializable error. – Ajay007 May 11 '22 at 13:45
  • Your second comment says you want to use HDFS. Why waste time debugging storing offsets back to Kafka? – OneCricketeer May 11 '22 at 13:54
  • Let me Explain. Actually I want to parse json string and get flatten message. Reason Why I am storing offset back to kafka is, IF I not committed the offset then kafka consumer stuck in infinite loop of offset issue. So I am commiting the offset to kafka. After commiting I need to again store the offset values to HDFS. that is the actual requirement. – Ajay007 May 11 '22 at 16:32
  • Yes, I understand your code, you don't need to explain the flattening part. The error is referring to the fact that `kafkaStream` is used inside of its own processing logic. But, out of curiosity, do you get the same error when you comment out all the flattening logic and simply forward the data to the producer and commit those offsets? You've not shown where `kafka_producer` is defined. That would answer if your flatten function is the problem, or not. You could also define the flatten function inside of the `forEachRDD` body and call `flatten(consumerRecord.value())` rather than use a DF – OneCricketeer May 12 '22 at 23:54
  • 1
    Hi Code working fine As I expected after changing `df.toJSON.map(row => {}` to `df.toJSON.foreach(row => {}`, Also I have added producer companion object inside foreach. – Ajay007 May 13 '22 at 13:18
  • I still dont think you should be using a dataframe here. You should be able to directly parse the RDD value field to a JSObject, then flatten that all within the same `rdd.map` call – OneCricketeer May 13 '22 at 15:02
  • 1
    Oh Thanks for your suggestion to use inside rdd.map. I followed you suggestion. and I have changed `rdd.map(...) to rdd.foreach(ConsumerRecord => jsonparser(consumerRecord))` seems working good, I hope it'll not throw any issues while working with Kafka, Thankyou – Ajay007 May 16 '22 at 08:54

0 Answers0