I use Kafka 0.10.1.0.
This is my producer:
val props: Properties = ...
val producer = new KafkaProducer[String, AnyRef](props)
val callback = new Callback {
override def onCompletion(md: RecordMetadata, e: Exception): Unit = ...
}
producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)
But the above callback can't handle java.net.ConnectException: Connection refused
in case when kafka-server is down.
UPD
The ConnectionException
is raised in another thread (into Sender
class which is used into KafkaProducer
). Therefore we can't use try {} catch
for it. Also I don't need retry mechanism, I need a way for handling this situation (for example, if Kafka is down and producer can't send message then I'm going to use some another Queue API).
Is there a way for handling this exception?