I have written spark job to read one file, convert the data to json and post the data to Kafka: I tried all options like 1.Putting thread.sleep 2.changing linger.ms lesser than thread.sleep.But nothing is working out..it Just not post any thing to kafKa .I have tried producer.flush()/producer.close().No error is coming in log.But still it is just not posting any thing. If i write a plain standalone producer to post the message to same kafka topic ,it is going without any issue. Hence there is no issue with Kafka as such. 4.I can see my send method is getting called from log .Also at end close is getting called .No error. Please help!!!!!!!!!!!!
Here is my Important files of the project:
build.sbt:
name := "SparkStreamingExample"
//version := "0.1"
scalaVersion := "2.11.8"
val spark="2.3.1"
val kafka="0.10.1"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.9.6"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.6"
dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.9.6"
// https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-cbor
dependencyOverrides += "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.6"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "2.0.0"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % spark
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1"
libraryDependencies +="com.typesafe.play" %"play-json_2.11" % "2.6.6" exclude("com.fasterxml.jackson.core","jackson-databind")
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
libraryDependencies +="com.typesafe" % "config" %"1.3.2"
MySparkKafkaProducer.scala
import java.util.Properties
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
class MySparkKafkaProducer(createProducer: () => KafkaProducer[String, String]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
@transient lazy val producer = createProducer()
def send(topic: String, key: String, value: String): Future[RecordMetadata] = {
println("inside send method")
producer.send(new ProducerRecord(topic, key, value))
}
def send(topic: String, value: String)= {
// println("inside send method")
producer.send(new ProducerRecord(topic, value))
}
// producer.send(new ProducerRecord[K, V](topic, value))
}
object MySparkKafkaProducer extends Serializable {
import scala.collection.JavaConversions._
def apply(config:Properties):MySparkKafkaProducer={
val f = () =>{
val producer =new KafkaProducer[String,String](config)
sys.addShutdownHook({
println("calling Closeeeeeeeeeee")
producer.flush()
producer.close
})
producer
}
new MySparkKafkaProducer(f)
}
}
AlibababaMainJob.scala
import java.util.Properties
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.codehaus.jackson.map.ser.std.StringSerializer
object AlibababaMainJob {
def main(args:Array[String]) {
val ss = SparkSession.builder().master("local[*]").appName("AlibabaCoreJob").getOrCreate()
val conf = new SparkConf().setMaster("local[2]").setAppName("AlibabaCoreJob")
//val ssc = new StreamingContext(conf, Seconds(1))
// val ssc= new StreamingContext(getSparkConf(),6)
val coreJob = new AlibabaCoreJob()
val configuration = Configuration.apply(ConfigFactory.load.resolve)
implicit val rollUpProducer: Broadcast[MySparkKafkaProducer] = ss.sparkContext.broadcast(MySparkKafkaProducer(producerProps(configuration)))
println(s"==========Kafka Config======${configuration.kafka}")
coreJob.processRecordFromFile(ss, rollUpProducer)
Thread.sleep(1000)
//ssc.start()
// println(s"==========Spark context Sarted ]======${ssc.sparkContext.appName}")
/// ssc.awaitTermination()
//
//val ss = SparkSession.builder().master("local[*]").appName("AlibabaCoreJob").getOrCreate()
//Set Up kakfa Configuration:https://stackoverflow.com/questions/31590592/spark-streaming-read-and-write-on-kafka-topic
}
def producerProps(jobConfig:Configuration,extras:(String,String)*):Properties={
val p =new Properties()
p.put("bootstrap.servers",jobConfig.kafka.brokerList)
p.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
p.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
p.put("acks","all")
p.put("retries","3")
p.put("linger.ms", "1")
p
}
// coreJob.processRecordFromFile(ss,rollUpProducer)
//}
}
AlibabaCoreJob.scala
import java.util.Properties
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import play.api.libs.json._
import org.apache.kafka.clients.producer.ProducerConfig
class AlibabaCoreJob extends Serializable {
// implicit val transctionWrites = Json.writes[Transction]
//case class Transction(productCode:String,description:String,brand:String,category:String,unitPrice:String,ordItems:String,mode:String) extends Serializable
def processRecordFromFile(ss:SparkSession,kafkaProducer:Broadcast[MySparkKafkaProducer]):Unit={
println("Entering processRecordFromFile")
val rddFromFile = ss.sparkContext.textFile("src/main/resources/12_transactions_case_study.csv")
println("Entering loaded file")
val fileAfterHeader=rddFromFile.mapPartitionsWithIndex(
(idx,iterator)=>if(idx==0)
iterator.drop(0) else iterator)
println("Removed header")
processRdd(fileAfterHeader,kafkaProducer:Broadcast[MySparkKafkaProducer])
}
//Set Up kakfa Configuration:https://stackoverflow.com/questions/31590592/spark-streaming-read-and-write-on-kafka-topic
def processRdd(fileAfterHeader: RDD[String],kafkaProducer:Broadcast[MySparkKafkaProducer]) = {
println("Entering processRdd")
val rddList = fileAfterHeader.mapPartitions(
line => {
// println("lineeeeee>>>"+line)
line.map(x => x.split(",")).map(y => Transction(y(0), y(1), y(2), y(3), y(4), y(5), y(6))).toList.toIterator
})
rddList.foreach(lineitem=>{
// println("Entering foreach>>>>")
val jsonString:String=Json.stringify(Json.toJson(lineitem))
//val jsonString:String=lineitem.A
// println("calling kafka producer")
kafkaProducer.value.send("topic",jsonString)
// println("done calling kafka producer")
})
}
//Suppose you want to drop 1s 3 lines from file
// val da = fi.mapPartitionsWithIndex{ (id_x, iter) => if (id_x == 0) iter.drop(3) else iter }
//Create RowRDD by mapping each line to the required fields
// val rowRdd = da.map(x=>Row(x(0), x(1)))
//Map Partitions:
}