47

Using the Apache Kafka Java client (0.9), I'm trying to send a long series of records to the broker using the Kafka Producer class.

The asynchronous send method returns immediately for a while, then starts blocking on each call for a short time period. After around thirty seconds, the client starts throwing exceptions (TimeoutException), with the message "Batch expired".

What circumstances cause this exception to be thrown?

James Thomas
  • 4,303
  • 1
  • 20
  • 26

7 Answers7

64

This exception indicates you are queueing records at a faster rate than they can be sent.

When you call the send method, the ProducerRecord will be stored in an internal buffer for sending to the broker. The method returns immediately once the ProducerRecord has been buffered, regardless of whether it has been sent.

Records are grouped into batches for sending to the broker, to reduce the transport overheard per message and increase throughput.

Once a record is added a batch, there is a time limit for sending that batch to ensure it has been sent within a specified duration. This is controlled by the Producer configuration parameter, request.timeout.ms, which defaults to thirty seconds.

If the batch has been queued longer than the timeout limit, the exception will be thrown. Records in that batch will be removed from the send queue.

Increasing the timeout limit, using the configuration parameter, will allow the client to queue batches for longer before expiring.

James Thomas
  • 4,303
  • 1
  • 20
  • 26
  • 1
    I wonder whether setting the `batch.size` to 0 (or a value between 1 and the standard value) would actually resolve the issue better? – Robert Metzger Feb 05 '16 at 12:41
  • Hi @JamesThomas, " indicates you are queueing records at a faster rate than they can be sent", what if I don't want to queue at all ? There will be lots of traffic in our production environment, we want to send data as soon as it gets. We don';t want to see this expiring at all. We've set the linger.ms as default, still getting this issue. You said increasing request.timeout.ms will increase the batching span. – Anil Kumar Dec 09 '17 at 09:55
  • 1
    Hello @AnilKumar. In my issue, I was sending data faster than the upload rate for my network interface. Increasing the timeout allowed the producer to have time to send all the data. If you reduce the linger and batch size parameters, you can reduce the artificial delay but you still won't be able to go faster than the network. There's an overhead to each Kafka message so it might make more sense to have a larger batch size and linger time to group messages together. There's more information here that explains: http://ingest.tips/2015/07/19/tips-for-improving-performance-of-kafka-producer/ – James Thomas Dec 11 '17 at 10:00
  • @RobertMetzger Will kafka producer do the retry(if retries set in kafka producer config >3) in case of "xxx ms has passed since batch creation plus linger time" ? – Prakash_se7en Apr 05 '22 at 15:54
  • Can someone please help me in reconciling the above answer with what the official documentation says (request.timeout.ms: The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.)? Source: https://kafka.apache.org/documentation.html#producerconfigs_request.timeout.ms – Prabhatika Vij Feb 10 '23 at 06:44
32

I got this exception in a completely different context.

I have setup a mini cluster of a zookeeper vm, a broker vm and a producer/consumer vm. I opened all neccessary ports on the server (9092) and on the zookeeper (2181) and then tried to publish a message from the consumer/publisher vm to the broker. I got the exception mentioned by the OP, but since I had only published one single message so far (or at least I tried to), the solution couldn't be to increase the timeout or batch size. So I searched on and found this mailing list describing a similar problem I had when trying to consume messages from within the consumer/producer vm (ClosedChannelException): http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config The last post in this mailing list actually describes how to solve the problem.

Long story short, if you face both the ChannelClosedException and the Batch Expired exception, you likely have to change this line to the following in the server.config file and restart the broker:

advertised.host.name=<broker public IP address>

If it isn't set, it falls back to the host.name property (which probably isn't set neither) and then falls back to the canonical host name of the InetAddress Java class, which finally isn't correct of course and thus confusing remote nodes.

Roberto
  • 713
  • 8
  • 14
  • listeners=PLAINTEXT://domain_name:9092 port=9092 host.name=localhost advertised.host.name=domain_name.i could send messages locally but when i made the kafka server live! im getting this exception "org.apache.kafka.common.errors.TimeoutException: Batch Expired java.util.concurrent.ExecutionException". Where am i going wrong – jack AKA karthik Jan 30 '17 at 07:36
  • The config file is under `/config/server.properties` for `0.10.2.1` if anyone else is looking. – Edd Jan 22 '18 at 13:29
3

The parameter that controls the time before sending to broker is linger.ms. Its default value is 0 (no delay).

Barett
  • 5,826
  • 6
  • 51
  • 55
S. Sar
  • 31
  • 2
  • It might not be clear from the original question what was happening in my example, I've tried to give more details now to make it clearer. See the comment below for full information. It was an issue that I was queueing records faster than my upload bandwidth. – James Thomas Jan 25 '16 at 10:22
3

I am using Kafka Java client version 0.11.0.0. I also started seeing the same pattern in failure to produce large messages consistently. It was passing for few of the messages, and failing for some others. (Although both passed and failed messages were of the same size).In my case, each message size was around 60KB, which is far higher than Kafka's default batch.size of 16kB, also my linger.ms was set to default of 0. This error is being thrown as the Producer client is timing out before it can receive a succesful response from the server.Basically, in my code , this call was timing out : kafkaProd.send(pr).get(). To fix this, I had to increase the Producer client's default request.timeout.ms to 60000

Binita Bharati
  • 5,239
  • 1
  • 43
  • 24
  • 2
    Upping the timeout may help, but it just seems like we're kicking the can down the road instead of addressing the root cause. – Lo-Tan Dec 10 '18 at 18:25
  • Facing a similar issue. I had tried to set the timeout to 120000 but if the load is there post 120000 ms then the producer throws batch expired. – Piyush Upadhyay Jun 09 '20 at 07:14
1

Had a similar issue with Kafka running in a docker-compose. My docker-compose.yml was set with

 KAFKA_ADVERTISED_HOST_NAME: kafka
 ports:
        - 9092:9092

But when I tried to send a message with camel from outside docker

to("kafka:test?brokers=localhost:9092")

I got a TimeoutException. I solved it by adding

127.0.0.1 kafka

to Windows\System32\drivers\etc\hosts file and then changing my camel url to

to("kafka:test?brokers=kafka:9092")
Rory G
  • 173
  • 1
  • 11
  • Actually my problem was caused by my topic not being setup properly. Cleaned all persistent data from docker and started fresh and it worked. – Rory G May 11 '18 at 07:53
  • Thank you @Rory G! I thought I was going nuts. Everything was working then it stopped working. I was chasing my tail on this problem for a while then took your advice to just wipe out my containers and storage before starting everything up again it and it solved my problem. It's spooky action at a distance though and I don't want this to be a problem each time. – afenkner Jun 01 '18 at 21:56
  • Just maintaining stuff around Kafka is starting to scare me. Randomly breaks after running for a month or two (seemingly the consumer driver). Do not like. – Lo-Tan Dec 10 '18 at 18:24
0

i solved it.

My Kafka is deployed in a Docker container, and the container's network mode is the bridge,The host and container use port mappings,and i I changed the default port to 9102 for the Kafka server.

The configuration items in server.properties to solve the problem are these two: listeners advertised.listeners

I tried several combinations:

success:

listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102

server can't start:

listeners=PLAINTEXT://192.168.0.136:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102

timeout error:

listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://:9102
WangTanxu
  • 71
  • 6
-9

when you create the consumer set ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true.

sse
  • 1,151
  • 2
  • 14
  • 26