1

I use Kafka 0.10.1.0.

This is my producer:

val props: Properties = ...
val producer = new KafkaProducer[String, AnyRef](props)
val callback = new Callback {
   override def onCompletion(md: RecordMetadata, e: Exception): Unit = ...
}
producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)

But the above callback can't handle java.net.ConnectException: Connection refused in case when kafka-server is down.

UPD

The ConnectionException is raised in another thread (into Sender class which is used into KafkaProducer). Therefore we can't use try {} catch for it. Also I don't need retry mechanism, I need a way for handling this situation (for example, if Kafka is down and producer can't send message then I'm going to use some another Queue API).

Is there a way for handling this exception?

John Mullins
  • 1,061
  • 4
  • 11
  • 38

1 Answers1

0

You have a few options. Scala provides a way to catch exceptions which takes the following form:

   try { 
   // ... 
   } 
   catch {
     case ioe: IOException => ... // more specific cases first !
     case e: Exception => ...
   }

So the most simple approach would be:

  try { 
     producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)
   } 
   catch {
     case ce: ConnectionException => // handle exception
   }

More complex but more robust would be a retry mechanism:

What's the Scala way to implement a retry-able call like this one?

Also note that the Kafka Producer has a retry mechanism built into it, which may also prove helpful:

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.

Community
  • 1
  • 1
crypto
  • 83
  • 7
  • Hi @crypto, thanks for the answer. But this way will not work because the `ConnectionException` is raised in another thread (into `Sender` class which is used into `KafkaProducer`). Therefore we can't use `try {} catch` for it. – John Mullins Nov 30 '16 at 09:48
  • yes, this is a separate thread, created inside `producer.send` method – John Mullins Dec 05 '16 at 11:16
  • So the exception is thrown before your callback is invoked? – crypto Dec 05 '16 at 20:04
  • When the exception throws, the callback is not invoked. And this exception appears in logs only. And we can't catch it. – John Mullins Dec 05 '16 at 21:16
  • I can't reproduce the error. metadata.fetch.timeout.ms = 60000 happens before I get a ConnectionException. I've tried an invalid host:port combination and the metadata fetch is timing out before i see any ConnectionException – crypto Dec 06 '16 at 04:00