The KafkaProducer send method both returns a Future and accepts a Callback.
Is there any fundamental difference between using one mechanism over the other to execute an action upon completion of the sending?
The KafkaProducer send method both returns a Future and accepts a Callback.
Is there any fundamental difference between using one mechanism over the other to execute an action upon completion of the sending?
Looking at the documentation you linked to it looks like the main difference between the Future and the Callback lies in who initiates the "request is finished, what now?" question.
Let's say we have a customer C
and a baker B
. And C
is asking B
to make him a nice cookie. Now there are 2 possible ways the baker can return the delicious cookie to the customer.
The baker accepts the request and tells the customer: Ok, when I'm finished I'll place your cookie here on the counter. (This agreement is the Future
.)
In this scenario, the customer is responsible for checking the counter (Future
) to see if the baker has finished his cookie or not.
blocking The customer stays near the counter and looks at it until the cookie is put there (Future.get()) or the baker puts an apology there instead (Error : Out of cookie dough).
non-blocking The customer does some other work, and once in a while checks if the cookie is waiting for him on the counter (Future.isDone()). If the cookie is ready, the customer takes it (Future.get()).
In this scenario the customer, after ordering his cookie, tells the baker: When my cookie is ready please give it to my pet robot dog here, he'll know what to do with it (This robot is the Callback).
Now the baker when the cookie is ready gives the cookie to the dog and tells him to run back to it's owner. The baker can continue baking the next cookie for another customer.
The dog runs back to the customer and starts wagging it's artificial tail to make the customer aware that his cookie is ready.
Notice how the customer didn't have any idea when the cookie would be given to him, nor was he actively polling the baker to see if it was ready.
That's the main difference between the 2 scenario's. Who is responsible for initiating the "your cookie is ready, what do you want to do with it?" question. With the Future, the customer is responsible for checking when it's ready, either by actively waiting, or by polling every now and then. In case of the callback, the baker will call back to the provided function.
I hope this answer gives you a better insight in what a Future and Calback actually are. Once you got the general idea, you could try to find out on which thread each specific thing is handled. When a thread is blocked, or in what order everything completes. Writing some simple programs that print statements like: "main client thread: cookie recieved" could be a fun way to experiment with this.
The asynchronous approach
producer.send(record, new Callback(){
@Override
onComplete(RecordMetadata rm, Exception ex){...}
})
gives you better throughput comparing to synchronous
RecordMetadata rm = producer.send(record).get();
since you don't wait for acknowledgements in first case.
Also in asynchronous way ordering is not guaranteed, whereas in synchronous it is - message is sent only after acknowledgement received.
Another difference could be that in synchronous call in case of exception you can stop sending messages straightaway after the exception occurs, whereas in second case some messages will be sent before you discover that something is wrong and perform some actions.
Also note that in asynchronous approach the number of messages which are "in fligh" is controlled by max.in.flight.requests.per.connection
parameter.
Apart from synchronous and asynchronous approaches you can use Fire and Forget approach, which is almost the same as synchronous, but without processing the returned metadata - just send the message and hope that it will reach the broker (knowing that most likely it will happen, and producer will retry in case of recoverable errors), but there is a chance that some messages will be lost:
RecordMetadata rm = producer.send(record);
To summarize:
The main difference is whether you want to block the calling thread waiting for the acknowledgment.
The following using the Future.get() method would block the current thread until the send is completed before performing some action.
producer.send(record).get()
// Do some action
When using a Callback to perform some action, the code will execute in the I/O thread so it's non-blocking for the calling thread.
producer.send(record,
new Callback() {
// Do some action
}
});
Though the docs says it 'generally' executed in the producer:
Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own Executor in the callback body to parallelize processing.
My observations based on The Kafka Producer documentation:
Future
gives you access to synchronous processingFuture
might not guarantee acknowledgement. My understanding is that a Callback
will execute after acknowledgementCallback
gives you access to fully non-blocking asynchronous processing.
Callbacks for records being sent to the same partition are guaranteed to execute in order.
My other opinion that the Future
return object and the Callback
'pattern' represents two different programming styles and I think that this is the fundamental difference:
Future
represents Java's Concurrency Model Style.Callback
represents Java's Lambda Programming Style (because Callback actually satisfies the requirement for a Functional Interface)You can probably end up coding similar behaviors with both the Future
and Callback
styles, but in some use cases it looks like one might style be more advantageous than the other.
send() is a method to start publishing a message on Kafka Cluster. The send() method is an asynchronous call that says that the send method accumulates message in Buffer and return back immediately. This can be used with linger.ms to batch publish messages for better performance. We can handle exception and control using the call send method with synchronous using get method on Future or asynchronous with a callback.
Each method has its own pros and cons and can be decided based on use cases.
Asynchronous send(Fire & Forget): We call the send method as below to call publish a message without waiting for any success or error response.
producer.send(new ProducerRecord<String, String>("topic-name", "key", "value"));
This scenario will not wait to get complete first message start sending other messages to get published. In case of exception, producer retry based on retry config parameter but if the message still fails after retrying Kafka Producer never know about this. We may lot some message in this case but if ok with few message loss this provides high throughput and high latency.
Synchronous send A simple way to send message synchronously is to use the get() method
RecordMetadata recMetadata = producer.send(new ProducerRecord<String, String>("topic-name", "key", "value")).get();
Producer.send returns Future of RecordMetadata and when we call .get() method it will get a reply from Kafka. We can catch Error in case of error or return RecordMetadata in case of success. RecordMetadata contains offset, partition, timestamp to log the information. It's slow but give high reliability and guarantee to deliver the message.
Asynchronous send with callback We can also call the send() method with a callback function which returns a response once the message gets completed. This is good if you like to send messages in an asynchronous way means not to wait to complete the job but at the same time handle Error or update status about message delivery.
producer.send(record, new Callback(){
@Override
onComplete(RecordMetadata recodMetadata, Exception ex){...}
})
Note: Please don’t confuse with ack & retries with asynchronous send call. Ack and retries will apply on each send call whether its synchronous or asynchronous call, the only matter how you handle return messages and failure scenario. For example, if you send asynchronous send still ack and retries rule gets applied but will be on an independent thread without blocking other thread to send parallel records. The only challenge we will not be aware of in case of failure and time when its message completed successfully.