Hey Guys I am working with Kafka + Spark Streaming with Play Json Library. I need to flatten the Incoming Complex Kafka message (json string).
Sample Incoming Kafka Message
"""{"name" : "somename" , age : 20 , "address" : "{"door_no" : "20A" , "street" : "new_street"}","ip":{"local":"128.7.0.1"}}"""
If you see above kafka message there is Json of Json with address value is String as Json. So I wrote a recursive function that will flatten the incoming message, Thats working perfectly.
My Actual Code
import scala.collection.Map
import play.api.libs.json._
object Cbr_Network_Connection {
def main(args: Array[String]): Unit = {
def json_flattensizer(JsObject):Map[String,String] = {
var map = Map[String,Any]
//Recursive Function that Generated Flatten Map using Incoming Json Message
return map
}
val kafkaStream = KafkaUtils.createDirectStream(sparkStreamingContext, PreferConsistent, Subscribe[String, String]("topic_name", kafkaConsumerParams))
kafkaStream.foreachRDD(foreachFunc = rdd => {
if (!rdd.isEmpty()) {
import ss.implicits._
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val newRDD = rdd.map(consumerRecord => consumerRecord.value())
val df = ss.read.option("mode", "DROPMALFORMED").json(ss.createDataset(newRDD)(Encoders.STRING))
val Enricheddf = df.toJSON.map(row => {
val in_json = .parse(record)
var map = json_flattensizer(in_json) // Java.io.Serialization error Coming here
Row(map("name"),map("age"),map("address_door_no"),map("ip_local"))
})(encoder)
Enricheddf.toJSON.foreach(record => {
kafka_producer.send("topic_name", record) //Sending Flatten data to another Kafka topic
})
kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
})
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()
}}
But while running above code I am getting Serialization Error
java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.
at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:137)
at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124)
at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1255)
at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1155)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializableWithWriteObjectMethod(SerializationDebugger.scala:230)
Any Ways to Serialize my recursive Scala function that (using Play Json Library)?