0

I'm working on a sparkstreaming project that transpot data from kafka to to another kafka cluster,and pullzed in writing to kafka. I learned from Spark Streaming - read and write on Kafka topic, to wrrap producer and send it to all executors. It works on local mode,but failed on yarn mode no matter yarn-client or yarn-cluster.

Here is my producer-wrapper code:

import org.apache.kafka.clients.producer.{KafkaProducer, _}

//kafkaproducer wrapper
class KProducer(createProducer: () => Producer[String,String]) extends Serializable{
   lazy val producer:Producer[String,String] = createProducer()

   //(topic,value)
   def aksend(topic:String,value:String):Unit ={
   producer.send(
      new ProducerRecord[String,String](topic,value),
      new Callback {
        override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {}
   })}
}

object KProducer{
def apply(config:java.util.Properties):KProducer  ={
   val createFunc= () => new KafkaProducer[String,String](config)
   new KProducer(createFunc)
 }
}

And I create a simple demo on github here

Can anybody help me with this problem,please???

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
kn1011
  • 1
  • Could you please share the error/exception? – Lalit Dec 15 '19 at 00:43
  • I suggest you put *something* in `onCompletion` to know if the message was actually sent. Also, Structured Streaming can write to Kafka without creating a Producer on your own, so what does your other code look like? ++ "from kafka to to another kafka" - Have you tried using MirrorMaker for that? – OneCricketeer Dec 15 '19 at 08:47
  • No exception or error when running on yarn mode.It just runs jammed with the first batch data all along.@Lalit – kn1011 Dec 15 '19 at 12:25

1 Answers1

0

SO STUPID AM I!! I forgot to add kafka-cluster hostnames.After I added hostnames,the program worked well.

kn1011
  • 1