4

I am using reactive-kafka-core 0.10.1 (targeting Kafka 0.9.x). It looks like Kafka producer actor is stopped whenever an error is encountered from the callback function. Is there any way to customize this behavior? Our use case is to try to recover and resend the messages.

private def processElement(element: ProducerMessage[K, V]) = {
  val record = richProducer.props.partitionizer(element.value) match {
  case Some(partitionId) => new ProducerRecord(richProducer.props.topic, partitionId, element.key, element.value)
  case None => new ProducerRecord(richProducer.props.topic, element.key, element.value)
}
richProducer.producer.send(record, new Callback {
  override def onCompletion(metadata: RecordMetadata, exception: Exception) = {
    if (exception != null) {
      handleError(exception)
    }
  }
})
()} private def handleError(ex: Throwable) = {
log.error(ex, "Stopping Kafka subscriber due to fatal error.")
stop()

}

user1373186
  • 101
  • 4

0 Answers0