34

The KafkaProducer send method both returns a Future and accepts a Callback.

Is there any fundamental difference between using one mechanism over the other to execute an action upon completion of the sending?

Thilo
  • 257,207
  • 101
  • 511
  • 656
  • Maybe there are ordering guarantees on the callbacks that you would have to manually implement with the sequence of returned Futures? – Thilo Apr 27 '17 at 11:49

5 Answers5

36

Looking at the documentation you linked to it looks like the main difference between the Future and the Callback lies in who initiates the "request is finished, what now?" question.

Let's say we have a customer C and a baker B. And C is asking B to make him a nice cookie. Now there are 2 possible ways the baker can return the delicious cookie to the customer.

Future

The baker accepts the request and tells the customer: Ok, when I'm finished I'll place your cookie here on the counter. (This agreement is the Future.)

In this scenario, the customer is responsible for checking the counter (Future) to see if the baker has finished his cookie or not.

blocking The customer stays near the counter and looks at it until the cookie is put there (Future.get()) or the baker puts an apology there instead (Error : Out of cookie dough).

non-blocking The customer does some other work, and once in a while checks if the cookie is waiting for him on the counter (Future.isDone()). If the cookie is ready, the customer takes it (Future.get()).

Callback

In this scenario the customer, after ordering his cookie, tells the baker: When my cookie is ready please give it to my pet robot dog here, he'll know what to do with it (This robot is the Callback).

Now the baker when the cookie is ready gives the cookie to the dog and tells him to run back to it's owner. The baker can continue baking the next cookie for another customer.

The dog runs back to the customer and starts wagging it's artificial tail to make the customer aware that his cookie is ready.

Notice how the customer didn't have any idea when the cookie would be given to him, nor was he actively polling the baker to see if it was ready.

That's the main difference between the 2 scenario's. Who is responsible for initiating the "your cookie is ready, what do you want to do with it?" question. With the Future, the customer is responsible for checking when it's ready, either by actively waiting, or by polling every now and then. In case of the callback, the baker will call back to the provided function.


I hope this answer gives you a better insight in what a Future and Calback actually are. Once you got the general idea, you could try to find out on which thread each specific thing is handled. When a thread is blocked, or in what order everything completes. Writing some simple programs that print statements like: "main client thread: cookie recieved" could be a fun way to experiment with this.

Imus
  • 802
  • 6
  • 11
22

The asynchronous approach

producer.send(record, new Callback(){
    @Override
    onComplete(RecordMetadata rm, Exception ex){...}
})

gives you better throughput comparing to synchronous

RecordMetadata rm = producer.send(record).get();

since you don't wait for acknowledgements in first case.

Also in asynchronous way ordering is not guaranteed, whereas in synchronous it is - message is sent only after acknowledgement received.

Another difference could be that in synchronous call in case of exception you can stop sending messages straightaway after the exception occurs, whereas in second case some messages will be sent before you discover that something is wrong and perform some actions.

Also note that in asynchronous approach the number of messages which are "in fligh" is controlled by max.in.flight.requests.per.connection parameter.

Apart from synchronous and asynchronous approaches you can use Fire and Forget approach, which is almost the same as synchronous, but without processing the returned metadata - just send the message and hope that it will reach the broker (knowing that most likely it will happen, and producer will retry in case of recoverable errors), but there is a chance that some messages will be lost:

RecordMetadata rm = producer.send(record);

To summarize:

  • Fire and Forget - fastest one, but some messages could be lost;
  • Synchronous - slowest, use it if you cannot afford to lose messages;
  • Asynchronous - something in between.
streetturtle
  • 5,472
  • 2
  • 25
  • 43
  • 1
    Wouldn't it also be asynchronous if I don't call `get` on the returned `Future` right away? – Thilo Apr 28 '17 at 00:07
  • But I guess the ability to make it synchronous is an advantage of the Future method. – Thilo Apr 28 '17 at 00:08
  • 1
    Add the bit about the callbacks having an ordering guarantee, and then this is the accepted answer. – Thilo May 05 '17 at 00:50
  • Mmm, added @Thilo – streetturtle May 20 '17 at 00:01
  • In all the examples i am seeing that everytime we do a producer.send() we are also creating a new object of callback. Can callback be created once and used for all the send(). Like CallBack c = new Callback(); producer.send(message,c); – wandermonk Apr 25 '18 at 16:53
  • Please explain. – wandermonk Apr 25 '18 at 16:54
  • @PhaniKumarYadavilli, yes you can create a single callback and share between multiple send() invocations. – Karthik Murugan Jun 12 '18 at 04:11
  • "Also in asynchronous way ordering is not guaranteed, whereas in synchronous it is - message is sent only after acknowledgement received.". I disagree - look at Shiraaz.M's answer. Ordering is always guaranteed if you send records from the same producer (except the case when you don't have retries and you have in-flight records set to non-zero). – zaxme Jan 20 '19 at 13:42
2

The main difference is whether you want to block the calling thread waiting for the acknowledgment.

The following using the Future.get() method would block the current thread until the send is completed before performing some action.

producer.send(record).get()
// Do some action

When using a Callback to perform some action, the code will execute in the I/O thread so it's non-blocking for the calling thread.

 producer.send(record,
               new Callback() {
                   // Do some action
                   }
               });

Though the docs says it 'generally' executed in the producer:

Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own Executor in the callback body to parallelize processing.

tchambers
  • 456
  • 3
  • 5
  • Okay, but I would not need to call `get` on the `Future` right away. I can defer that until it has completed. – Thilo Apr 27 '17 at 11:11
  • I'm not sure if I understand correctly, but how can you defer the call to get() if you don't know that the asynchronous call to send() has completed? This is why the get() method of Future blocks until a result is available. – tchambers Apr 27 '17 at 13:20
  • There is `Future.isDone`. If you call `get` only after that has become `true`, `get` will return immediately. Someone has to poll or block to get the result eventually, but it does not have to be the thread that called into Kafka. – Thilo Apr 27 '17 at 13:26
  • You can collect all the `Future` objects and then resolve them in one go at the end of your transaction (whatever that means in your own context). In theory that ought to mean that waiting only happens when all your processing has been done. The documentation, and example practice, is a little unclear on if you could just keep and resolve one Future per destination topic partition, or if there would be any advantage in doing so. – jrg Jun 13 '19 at 18:47
2

My observations based on The Kafka Producer documentation:

  • Future gives you access to synchronous processing
  • Future might not guarantee acknowledgement. My understanding is that a Callback will execute after acknowledgement
  • Callback gives you access to fully non-blocking asynchronous processing.
    • There are also guarantees on the ordering of execution for a callback on the same partition

Callbacks for records being sent to the same partition are guaranteed to execute in order.

My other opinion that the Future return object and the Callback 'pattern' represents two different programming styles and I think that this is the fundamental difference:

  • The Future represents Java's Concurrency Model Style.
  • The Callback represents Java's Lambda Programming Style (because Callback actually satisfies the requirement for a Functional Interface)

You can probably end up coding similar behaviors with both the Future and Callback styles, but in some use cases it looks like one might style be more advantageous than the other.

Shiraaz.M
  • 3,073
  • 2
  • 24
  • 40
  • 1
    Your saying about Future not guaranteeing acknowledgement is wrong. the Future is mark done right after callbacks are called. see function completeFutureAndFireCallbacks @ ProducerBatch class in kafka java client. – H. Opler Apr 05 '20 at 11:22
2

send() is a method to start publishing a message on Kafka Cluster. The send() method is an asynchronous call that says that the send method accumulates message in Buffer and return back immediately. This can be used with linger.ms to batch publish messages for better performance. We can handle exception and control using the call send method with synchronous using get method on Future or asynchronous with a callback.

Each method has its own pros and cons and can be decided based on use cases.

Asynchronous send(Fire & Forget): We call the send method as below to call publish a message without waiting for any success or error response.

producer.send(new ProducerRecord<String, String>("topic-name", "key", "value"));

This scenario will not wait to get complete first message start sending other messages to get published. In case of exception, producer retry based on retry config parameter but if the message still fails after retrying Kafka Producer never know about this. We may lot some message in this case but if ok with few message loss this provides high throughput and high latency.

Synchronous send A simple way to send message synchronously is to use the get() method

RecordMetadata recMetadata = producer.send(new ProducerRecord<String, String>("topic-name", "key", "value")).get();

Producer.send returns Future of RecordMetadata and when we call .get() method it will get a reply from Kafka. We can catch Error in case of error or return RecordMetadata in case of success. RecordMetadata contains offset, partition, timestamp to log the information. It's slow but give high reliability and guarantee to deliver the message.

Asynchronous send with callback We can also call the send() method with a callback function which returns a response once the message gets completed. This is good if you like to send messages in an asynchronous way means not to wait to complete the job but at the same time handle Error or update status about message delivery.

producer.send(record, new Callback(){
    @Override
    onComplete(RecordMetadata recodMetadata, Exception ex){...}
})

Note: Please don’t confuse with ack & retries with asynchronous send call. Ack and retries will apply on each send call whether its synchronous or asynchronous call, the only matter how you handle return messages and failure scenario. For example, if you send asynchronous send still ack and retries rule gets applied but will be on an independent thread without blocking other thread to send parallel records. The only challenge we will not be aware of in case of failure and time when its message completed successfully.

Nitin
  • 3,533
  • 2
  • 26
  • 36