0

I have an application that is using future for asynchronous execution.
I set the parameter on get method, that the thread should get killed after 10 seconds, when it does not get the response:

 Future<RecordMetadata> meta = producer.send(record, new ProducerCallBack());
      RecordMetadata data = meta.get(10, TimeUnit.SECONDS);  

But the thread get killed after 60 second:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1124)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
    at io.khinkali.KafkaProducerClient.main(KafkaProducerClient.java:49)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

What am I doing wrong?

Gray
  • 115,027
  • 24
  • 293
  • 354
softshipper
  • 32,463
  • 51
  • 192
  • 400
  • Use some higher value for the timeout – Stimpson Cat Nov 17 '17 at 08:09
  • I change it to 10 SECONDS. – softshipper Nov 17 '17 at 08:10
  • https://stackoverflow.com/questions/38457706/error-error-when-sending-message-to-topic – Sam Orozco Nov 17 '17 at 08:11
  • I did not start the kafka server, because I wanted to figure out, if the exception get caught. – softshipper Nov 17 '17 at 08:15
  • There's two different timeouts here. The 10 seconds does not specify that it should abort after waiting. The timeout you give to "get" only means "wait for the result for 10 seconds, if it's not ready by then, just continue." – PeterT Nov 17 '17 at 08:15
  • And how to kill the thread? – softshipper Nov 17 '17 at 08:16
  • I don't know how Kafka works, but I would say that you should set the timeout setting in there. But if it supports canceling you could try `if(!meta.isDone()){meta.cancel(true);} ` after your get. But like I said, you should look at the kafka documentation how to set the timeout correctly. – PeterT Nov 17 '17 at 08:20

2 Answers2

2

From the docs:

The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.

Check Kafka Appender config in logback.xml, look for:

<producerConfig>max.block.ms=60000</producerConfig>
Nir Alfasi
  • 53,191
  • 11
  • 86
  • 129
1

I set the parameter on get method, that the thread should get killed after 10 seconds, when it does not get the response:

If we are talking about Future.get(...) there is nothing about it that "kills" the thread at all. To quote from the javadocs, the Future.get(...) method:

Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available.

If the get(...) method times out then it will throw TimeoutException but your thread is free to continue to run. If you want to stop the thread running then you'll need to catch TimeoutException and then call meta.cancel(true) but even that doesn't guarantee that the thread will be "killed". That causes the thread to be interrupted which means that certain methods will throw InterruptedException or the thread needs to be checking for Thread.currentThread().isInterrupted().

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Yeah this timeout has nothing to do with the Future.get(...) timeout.

Gray
  • 115,027
  • 24
  • 293
  • 354