21

I'm using Spring-Kafka version 1.2.1 and, when the Kafka server is down/unreachable, the asynchronous send calls block for a time. It seems to be the TCP timeout. The code is something like this:

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
    @Override
    public void onSuccess(SendResult<K, V> result) {
        ...
    }

    @Override
    public void onFailure(Throwable ex) {
        ...
    }
});

I've taken a really quick look at the Spring-Kafka code and it seems to just pass the task along to the kafka client library, translating a callback interaction to a future object interaction. Looking at the kafka client library, the code gets more complex and I didn't take the time to understand it all, but I guess it may be making remote calls (metadata, at least?) in the same thread.

As a user, I expected the Spring-Kafka methods that return a future to return immediately, even if the remote kafka server is unreachable.

Any confirmation if my understanding is wrong or if this is a bug would be welcome. I ended up making it asynchronous on my end for now.

Another problem is that Spring-Kafka documentation says, at the beginning, that it provides synchronous and asynchronous send methods. I couldn't find any methods that do not return futures, maybe the documentation needs updating.

I'm happy to provide any further details if needed. Thanks.

5 Answers5

15

In addition to the @EnableAsync annotation on a configuration class, the @Async annotation needs to be used on the method were you invoke this code.

http://www.baeldung.com/spring-async

Here some code fragements. Kafka producer config:

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
    }

    @Bean
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
        return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
    }

    @Bean
    public Producer producer() {
        return new Producer();
    }
}

And the producer itself:

public class Producer {

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, GenericMessage> kafkaTemplate;

    @Async
    public void send(String topic, GenericMessage message) {
        ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {

            @Override
            public void onSuccess(final SendResult<String, GenericMessage> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}
Jorge C
  • 478
  • 4
  • 14
  • 1
    Thank you for your response. No I'm not using these annotations, there was nothing about them in the documentation. I'll try both and let you know if it solves the problem. – Carlos E. L. Augusto Jul 17 '17 at 16:05
  • 1
    Using EnableAsync unfortunately didn't change anything. Also, from the link I understand that it is the spring-kafka library that should be using the Async annotation, as it provides me the future object. – Carlos E. L. Augusto Jul 18 '17 at 13:02
  • 2
    I agree with you, to me it does not make sense that you provide futures but I have to place the annotations anyway. In our case placing those two annotations made it work like a charm. I'll edit the response adding some code fragments. – Jorge C Jul 19 '17 at 08:29
  • 1
    My understanding of your code fragments is that you made your own code asynchronous by using the spring way to do it. It works for you because the send method returns void, so Spring executes its content inside a new thread and returns immediately to send's caller. However, the spring-kafka calls you make remain synchronous. Your code is similar to what I did in my code to solve the problem, but I used plain Java because I needed more control. With both yours and mine solutions, I would expect the execution flow to change threads 2 times if spring-kafka did it right. – Carlos E. L. Augusto Jul 19 '17 at 18:48
11

If I look at the KafkaProducer itself, there are two parts of sending a message:

  1. Storing the message into the internal buffer.
  2. Uploading the message from the buffer into Kafka.

KafkaProducer is asynchronous for the second part, not the first part.

The send() method can still be blocked on the first part and eventually throw TimeoutExceptions, e.g:

  • The metadata for the topics is not cached or stale, so the producer tries to get the metadata from the server to know if the topic still exists and how many partitions it has.
  • The buffer is full (32MB by default).

If the server is completely unresponsive, you will probably encounter both issues.

Update:

I tested and confirmed this in Kafka 2.2.1. It looks like this behaviour might be different in 2.4 and/or 2.6: KAFKA-3720

GeertPt
  • 16,398
  • 2
  • 37
  • 61
  • Wow, almost 3 years for an actual answer, thanks. I don't even use this anymore but I actually took the time to look up the current docs now to see if this was described there, but it isn't. This documentation certainly needs improving. – Carlos E. L. Augusto Jul 10 '20 at 00:49
  • Sorry I pushed enter and sent the comment, then took more than 5mins to edit... so here's rest: I didn't expect this behavior, I expected to get any errors from the future object when using the asynchronous send method. Returning a future in both synchronous and asynchronous versions also didn't help... thanks again for taking your time. – Carlos E. L. Augusto Jul 10 '20 at 00:56
  • I actually encountered this issue myself, and answered your question before I realized it was 3 years old :-) – GeertPt Jul 10 '20 at 09:06
  • Reading the issue you mentioned, the behavior seems justified. However, this really needs to be explained in the docs. But as was said there, even the tests were expecting a different behavior :P – Carlos E. L. Augusto Jul 11 '20 at 14:06
3

Best solution is to add a 'Callback' Listener at the level of the Producer.

@Bean
public KafkaTemplate<String, WebUserOperation> operationKafkaTemplate() {
    KafkaTemplate<String, WebUserOperation> kt = new KafkaTemplate<>(operationProducerFactory());
    kt.setProducerListener(new ProducerListener<String, WebUserOperation>() {

        @Override
        public void onSuccess(ProducerRecord<String, WebUserOperation> record, RecordMetadata recordMetadata) {
            System.out.println("### Callback :: " + recordMetadata.topic() + " ; partition = " 
                    + recordMetadata.partition()  +" with offset= " + recordMetadata.offset()
                    + " ; Timestamp : " + recordMetadata.timestamp() + " ; Message Size = " + recordMetadata.serializedValueSize());
        }

        @Override
        public void onError(ProducerRecord<String, WebUserOperation> producerRecord, Exception exception) {
            System.out.println("### Topic = " + producerRecord.topic() + " ; Message = " + producerRecord.value().getOperation());
            exception.printStackTrace();
        }
    });
    return kt;
}
  • I know I'm nearly 18 months late, but it would be helpful if you/someone could explain how this solves the problem described. – ranban282 Oct 31 '21 at 15:03
2

Just to be sure. Do you have the @EnableAsync annotation applied? I want to say that could be the key to specifying the behavior of Future<>

Chad
  • 166
  • 7
0

Below code works for me to get response asynchronously

    ProducerRecord<UUID, Person> person = new ProducerRecord<>(kafkaTemplate.getDefaultTopic(), messageKey,Person);
    Runnable runnable = () -> kafkaTemplate.send(person).addCallback(new MessageAckHandler());
    new Thread(runnable).start();

  public class MessageAckHandler implements ListenableFutureCallback<SendResult<UUID,Person>> {

    @Override
    public void onFailure(Throwable exception) {
      log.error("unable to send message: " + exception.getMessage());
     }

     @Override
     public void onSuccess(SendResult<UUID, ScreeningEvent> result) {
       log.debug("sent message with offset={} messageID={}", result.getRecordMetadata().offset(), result.getProducerRecord().key());
     }
  }

   public class SendResult<K, V> {

     private final ProducerRecord<K, V> producerRecord;

     private final RecordMetadata recordMetadata;

     public SendResult(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
        this.producerRecord = producerRecord;
        this.recordMetadata = recordMetadata;
    }

    public ProducerRecord<K, V> getProducerRecord() {
       return this.producerRecord;
    }

    public RecordMetadata getRecordMetadata() {
       return this.recordMetadata;
    }

    @Override
    public String toString() {
       return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";
    }

 }
AnupKurup
  • 41
  • 4
  • 1
    Well this works of course, but you're just making it asynchronous yourself which was exactly what I did back then and said in the original question. Back then I wanted to know if this was a bug in spring kafka or if I was missing something... – Carlos E. L. Augusto Jul 09 '20 at 16:08
  • @CarlosE.L.Augusto: please share what you have done to handle your issue in code. – Praveen kumar singhal Jun 21 '21 at 09:34