177

I send String-messages to Kafka V. 0.8 with the Java Producer API. If the message size is about 15 MB I get a MessageSizeTooLargeException. I have tried to set message.max.bytesto 40 MB, but I still get the exception. Small messages worked without problems.

(The exception appear in the producer, I don't have a consumer in this application.)

What can I do to get rid of this exception?

My example producer config

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Error-Log:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Martin Thoma
  • 124,992
  • 159
  • 614
  • 958
Sonson123
  • 10,879
  • 12
  • 54
  • 72
  • 10
    My first instinct would be to ask you to split this huge message into several smaller ones :-/ My guess is that this isn't possible for some reason but you may want to reconsider it nonetheless: Huge messages usually means there is a design flaw somewhere that should really be fixed. – Aaron Digulla Jan 09 '14 at 12:35
  • 1
    Thanks, but it would make my logic a lot more complex. Why is it a *bad* idea to use Kafka for messages around 15MB? Is 1 MB the maximum message size limit that can be used? I found not much about the message size limit in the Kafka documentation. – Sonson123 Jan 09 '14 at 13:02
  • 2
    This is completely unrelated to Kafka or any other message processing system. My reasoning: If something goes wrong with your 15MB file, then cleaning up the mess afterwards is very expensive. That's why I usually split large files into many smaller jobs (which can then usually be executed in parallel as well). – Aaron Digulla Jan 09 '14 at 13:49
  • have you used any compression? could you please share some more details, its kinda hard to guess something out of just one single word – user2720864 Jan 09 '14 at 20:29
  • For those who stumble upon this question, but use `librdkafka` for the communication with Kafka, see also: https://stackoverflow.com/questions/60739858/how-to-set-the-max-size-of-a-kafka-message-using-librdkafka – Miljen Mikic Oct 20 '20 at 14:28

9 Answers9

267

You need to adjust three (or four) properties:

  • Consumer side:fetch.message.max.bytes - this will determine the largest size of a message that can be fetched by the consumer.
  • Broker side: replica.fetch.max.bytes - this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
  • Broker side: message.max.bytes - this is the largest size of the message that can be received by the broker from a producer.
  • Broker side (per topic): max.message.bytes - this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker's message.max.bytes.)

I found out the hard way about number 2 - you don't get ANY exceptions, messages, or warnings from Kafka, so be sure to consider this when you are sending large messages.

Community
  • 1
  • 1
laughing_man
  • 3,756
  • 1
  • 20
  • 20
  • 4
    Ok, you and user2720864 were correct. I had only set the `message.max.bytes` in the source code. But I have to set these values in the configuration of the Kafka server `config/server.properties`. Now also bigger messages work :). – Sonson123 Feb 03 '14 at 15:37
  • 4
    Are there any known disadvantages setting these values too high? – Ivan Balashov Aug 18 '14 at 20:06
  • 15
    Yes. On the consumer side, you allocate `fetch.message.max.bytes` memory for EACH partition. This means that if you use a huge number for `fetch.message.max.bytes` combined with a large number of partitions, it will consume a lot of memory. In fact, since the replication process between the brokers is also a specialized consumer, this will also consume memory on the brokers. – laughing_man Aug 18 '14 at 23:13
  • Post suggested changes, the consumer isnt able to consume the message http://stackoverflow.com/questions/32231095/kafka-kafka-common-messagesizetoolargeexception-at-consumer any ideas? – Kedar Parikh Aug 26 '15 at 15:49
  • fetch.message.max.bytes is to be added to consumer.properties – Kedar Parikh Aug 27 '15 at 05:34
  • 3
    Note there is also a `max.message.bytes` configuration *per-topic* which can be lower than the broker's `message.max.bytes`. – Peter Davis May 20 '16 at 16:41
  • 3
    According to official doc, the parameters on the consumer side and those regarding the replication between brokers `/.*fetch.*bytes/` don't seem to be hard limits: "This is not an absolute maximum, if [...] larger than this value, the record batch will still be returned to ensure that progress can be made." – bluu Dec 14 '18 at 17:05
  • 1
    As of Kafka 0.10.1.0, the response and partition limits *do not* have to be equal to or larger than the message size limit, as the fetch logic now accepts individual messages even if they exceed the fetch size limit. See http://kafka.apache.org/documentation/#upgrade_1010_notable. – Raman Apr 29 '19 at 18:15
  • 1
    Please consider editing your answer. I'm pretty sure that `fetch.message.max.bytes` is not a read limit but rather a batch size config. Messages larger than that will still be returned. – Gray Aug 28 '19 at 14:44
  • Hi, do you also need to set the `batch.size` and `linger.ms` on the producer side? https://www.cloudera.com/documentation/kafka/latest/topics/kafka_performance.html – jack Aug 28 '19 at 19:35
  • Change Kafka Configuration (default is 1 MB) (example 10 MB) - Topic Side: modify max.message.bytes=10485880 - Broker-Level, set max replication fetch size (example 10 MB) replica.fetch.max.bytes=10485880 & message.max.bytes=10485880 - Consumer side: max.partition.fetch.bytes=10485880 - Producer side : max.request.size=10485880 – codebased Nov 24 '22 at 14:56
70

Minor changes required for Kafka 0.10 and the new consumer compared to laughing_man's answer:

  • Broker: No changes, you still need to increase properties message.max.bytes and replica.fetch.max.bytes. message.max.bytes has to be equal or smaller(*) than replica.fetch.max.bytes.
  • Producer: Increase max.request.size to send the larger message.
  • Consumer: Increase max.partition.fetch.bytes to receive larger messages.

(*) Read the comments to learn more about message.max.bytes<=replica.fetch.max.bytes

Community
  • 1
  • 1
Sascha Vetter
  • 2,466
  • 1
  • 19
  • 36
  • 2
    Do you know why `message.max.bytes` needs to be smaller than `replica.fetch.max.bytes`? – Kostas Mar 23 '17 at 03:17
  • 3
    "**replica.fetch.max.bytes** (default: 1MB) – Maximum size of data that a broker can replicate. This has to be larger than **message.max.bytes**, or a broker will accept messages and fail to replicate them. Leading to potential data loss." Source: [handling-large-messages-kafka](http://ingest.tips/2015/01/21/handling-large-messages-kafka/) – Sascha Vetter Mar 23 '17 at 18:32
  • 3
    Thank you for getting back to me with a link. This seems to echo what the [Cloudera guide](https://www.cloudera.com/documentation/kafka/latest/topics/kafka_performance.html) suggests as well. Both of these however are wrong - notice that they don't offer any technical reason as to _why_ `replica.fetch.max.bytes` should be strictly larger to `message.max.bytes`. A Confluent employee [confirmed earlier today](https://lists.apache.org/thread.html/dc9c636dd0b4b1bfe6a0310b6e550ebf71ef6b197910ab40d7500bca@%3Cusers.kafka.apache.org%3E) what I suspected: that the two quantities can, in fact, be equal. – Kostas Mar 24 '17 at 01:29
  • Ugh. I just realized that your link points to an article written by Gwen, who _also_ works for Confluent. So we seem to have an odd disagreement here. I'll post on the kafka-users mailing list and see what's going on. I'll update this thread with my findings. – Kostas Mar 24 '17 at 01:32
  • 2
    Are there any updates regarding `message.max.bytes – Sascha Vetter Apr 27 '17 at 21:49
  • 3
    Yes, they can be equal: https://www.mail-archive.com/users@kafka.apache.org/msg25494.html (Ismael works for Confluent) – Kostas Apr 28 '17 at 05:19
33

The answer from @laughing_man is quite accurate. But still, I wanted to give a recommendation which I learned from Kafka expert Stephane Maarek. We actively applied this solution in our live systems.

Kafka isn’t meant to handle large messages.

Your API should use cloud storage (for example, AWS S3) and simply push a reference to S3 to Kafka or any other message broker. You'll need to find a place to save your data, whether it can be a network drive or something else entirely, but it shouldn't be a message broker.

If you don't want to proceed with the recommended and reliable solution above,

The message max size is 1MB (the setting in your brokers is called message.max.bytes) Apache Kafka. If you really needed it badly, you could increase that size and make sure to increase the network buffers for your producers and consumers.

And if you really care about splitting your message, make sure each message split has the exact same key so that it gets pushed to the same partition, and your message content should report a “part id” so that your consumer can fully reconstruct the message.

If the message is text-based try to compress the data, which may reduce the data size, but not magically.

Again, you have to use an external system to store that data and just push an external reference to Kafka. That is a very common architecture and one you should go with and widely accepted.

Keep that in mind Kafka works best only if the messages are huge in amount but not in size.

Source: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

Player_Neo
  • 1,413
  • 2
  • 19
  • 28
  • 3
    Kafka works with large messages, absolutely no issue. The intro page on the Kafka home page even references it as a storage system. – calloc_org Jul 09 '20 at 13:11
  • @Bhanu Hoysala - I agee large messages should be persisted to storage then a reference sent in the message. That being said, how do you guarantee that both the data gets written and the reference message gets pushed atomically? Both such succeed or neither. – Jeremy Oct 02 '20 at 04:56
  • @Jeremy We need to have another topic/queue listing to the changes done to the bucket (we can configure to get the notification for only create event). In a success case, we will get the message according to the configuration (You do not receive event notifications from failed operations in S3). In a failed case, the file uploading service will know whether the write is success or not (this is a synchronous operation). https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html Depends on the Broker & Storage combinations various kinds of integrations can be done. – Player_Neo Oct 03 '20 at 16:05
  • 1
    @Player_Neo, you said that "Kafka isn’t meant to handle large messages.". Could you also throw light on what are the impacts of increasing message size? – Rajdeep Siddhapura Jan 11 '22 at 10:01
22

The idea is to have equal size of message being sent from Kafka Producer to Kafka Broker and then received by Kafka Consumer i.e.

Kafka producer --> Kafka Broker --> Kafka Consumer

Suppose if the requirement is to send 15MB of message, then the Producer, the Broker and the Consumer, all three, needs to be in sync.

Kafka Producer sends 15 MB --> Kafka Broker Allows/Stores 15 MB --> Kafka Consumer receives 15 MB

The setting therefore should be:

a) on Broker:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) on Consumer:

fetch.message.max.bytes=15728640
Andrey
  • 485
  • 3
  • 17
Ravi
  • 221
  • 2
  • 3
16

You need to override the following properties:

Broker Configs($KAFKA_HOME/config/server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Consumer Configs($KAFKA_HOME/config/consumer.properties)
This step didn't work for me. I add it to the consumer app and it was working fine

  • fetch.message.max.bytes

Restart the server.

look at this documentation for more info: http://kafka.apache.org/08/configuration.html

user2550587
  • 635
  • 6
  • 15
  • 1
    for the commandline consumer, I need to use the --fetch-size= flag. It doesn't seem to read the consumer.properties file (kafka 0.8.1) . I would also recommend turning on compression from the producer side using the compression.codec option. – Ziggy Eunicien May 09 '14 at 00:08
  • Ziggy's comment worked for me kafka 0.8.1.1. Thank you! – James Apr 04 '15 at 03:32
  • 1
    could it be that fetch.message.max.bytes is replaced by max.partition.fetch.bytes in ConsumerConfig? – nano_nano Jun 17 '16 at 12:05
10

I think, most of the answers here are kind of outdated or not entirely complete.

To refer on the answer of Sacha Vetter (with the update for Kafka 0.10), I'd like to provide some additional Information and links to the official documentation.


Producer Configuration:

Broker/Topic configuration:

  • message.max.bytes (Link) may be set, if one like to increase the message size on broker level. But, from the documentation: "This can be set per topic with the topic level max.message.bytes config."
  • max.message.bytes (Link) may be increased, if only one topic should be able to accept lager files. The broker configuration must not be changed.

I'd always prefer a topic-restricted configuration, due to the fact, that I can configure the topic by myself as a client for the Kafka cluster (e.g. with the admin client). I may not have any influence on the broker configuration itself.


In the answers from above, some more configurations are mentioned as necessary:

From the documentation: "This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made."

From the documentation: "Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress."

From the documentation: "Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress."


Conclusion: The configurations regarding fetching messages are not necessary to change for processing messages, lager than the default values of these configuration (had this tested in a small setup). Probably, the consumer may always get batches of size 1. However, two of the configurations from the first block has to be set, as mentioned in the answers before.

This clarification should not tell anything about performance and should not be a recommendation to set or not to set these configuration. The best values has to be evaluated individually depending on the concrete planned throughput and data structure.

MichaelCkr
  • 540
  • 1
  • 7
  • 14
7

One key thing to remember that message.max.bytes attribute must be in sync with the consumer's fetch.message.max.bytes property. the fetch size must be at least as large as the maximum message size otherwise there could be situation where producers can send messages larger than the consumer can consume/fetch. It might worth taking a look at it.
Which version of Kafka you are using? Also provide some more details trace that you are getting. is there some thing like ... payload size of xxxx larger than 1000000 coming up in the log?

om-nom-nom
  • 62,329
  • 13
  • 183
  • 228
user2720864
  • 8,015
  • 5
  • 48
  • 60
  • 1
    I have updated my question with more information: Kafka Version 2.8.0-0.8.0; now I only need the producer. – Sonson123 Jan 10 '14 at 08:06
3

For people using landoop kafka: You can pass the config values in the environment variables like:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

This sets topic.max.message.bytes and replica.fetch.max.bytes on the broker.

And if you're using rdkafka then pass the message.max.bytes in the producer config like:

  const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

Similarly, for the consumer,

  const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }                                                                                                                                                                                                                                                      
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
informer
  • 821
  • 6
  • 18
0

Here is how I achieved successfully sending data up to 100mb using kafka-python==2.0.2:

Broker:

consumer = KafkaConsumer(
    ...
    max_partition_fetch_bytes=max_bytes,
    fetch_max_bytes=max_bytes,         
)

Producer (See final solution at the end):

producer = KafkaProducer(
    ...
    max_request_size=KafkaSettings.MAX_BYTES,
)

Then:

producer.send(topic, value=data).get()

After sending data like this, the following exception appeared:

MessageSizeTooLargeError: The message is n bytes when serialized which is larger than the total memory buffer you have configured with the buffer_memory configuration.

Finally I increased buffer_memory (default 32mb) to receive the message on the other end.

producer = KafkaProducer(
    ...
    max_request_size=KafkaSettings.MAX_BYTES,
    buffer_memory=KafkaSettings.MAX_BYTES * 3,
)
Tobias Ernst
  • 4,214
  • 1
  • 32
  • 30