I'm working on a sparkstreaming project that transpot data from kafka to to another kafka cluster,and pullzed in writing to kafka. I learned from Spark Streaming - read and write on Kafka topic, to wrrap producer and send it to all executors. It works on local mode,but failed on yarn mode no matter yarn-client or yarn-cluster.
Here is my producer-wrapper code:
import org.apache.kafka.clients.producer.{KafkaProducer, _}
//kafkaproducer wrapper
class KProducer(createProducer: () => Producer[String,String]) extends Serializable{
lazy val producer:Producer[String,String] = createProducer()
//(topic,value)
def aksend(topic:String,value:String):Unit ={
producer.send(
new ProducerRecord[String,String](topic,value),
new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {}
})}
}
object KProducer{
def apply(config:java.util.Properties):KProducer ={
val createFunc= () => new KafkaProducer[String,String](config)
new KProducer(createFunc)
}
}
And I create a simple demo on github here
Can anybody help me with this problem,please???