2

We are using Kafka streams state store in the project, and we want to store more than 1MB of data, but we got below exception:

The message is 1760923 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

Then I followed the link Add prefix to StreamsConfig to enable setting default internal topic configs and added the following config:

topic.max.request.size=50000000

Then application works fine, and it can works fine when state store internal topic had been created but when Kafka been restarted and the state store topic had been lost/delete, then the Kafka stream processor need to create the internal state store topic automatically when start the application, at that moment, it throw exception which says:

"Aorg.apache.kafka.streams.errors.StreamsException: Could not create topic data-msg-seq-state-store-changelog. at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:148)....
.....
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: max.request.size".

The solution is we can manually create the internal topic, but that should not be good one.

Can you help me on this issue? If there is any config I have missed?

Thanks very much.

17 June 2020 update: still not resolve the issue. anyone can help?

Jay
  • 21
  • 1
  • 4
  • `max.request.size` is a producer config, not a topic config: https://kafka.apache.org/documentation/#max.request.size – Matthias J. Sax Jun 06 '20 at 20:37
  • yes. is it means I need to config this for producer on broker? or I misunderstand something. since it's internal topic. – Jay Jun 08 '20 at 03:18
  • 1
    It means, you need to configure it on the producer. -- Internally, Kafka Streams uses the Kafka cosumer/producer/admin client and you can configure them as if you use those clients directly: https://docs.confluent.io/current/streams/developer-guide/config-streams.html#ak-consumers-producer-and-admin-client-configuration-parameters – Matthias J. Sax Jun 09 '20 at 00:31
  • actually i have configed the max.request.size for the producer, and if i not config it for internal topic(changelog), then it will throw exception. – Jay Jun 09 '20 at 08:28

2 Answers2

6

The solution that you are looking for lies in the Kafka Stream's configuration properties that you set before starting the stream.

props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5242880");

The value I used here is 5 MB in bytes. You can change the value to suit your needs.

maxaxis
  • 81
  • 1
  • 5
0

I don't see a configuration with max.request.size. May be it is max.message.bytes (Topic configuration reference). So, you may try setting this.

You can refer to the broker setting max.message.bytes and increase it. It sets it at the broker level.

Documentation states:

The largest record batch size allowed by Kafka (after compression if compression is enabled). If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config.

Default: 1048588 (~1Mb) (Confluent Kafka)

Also refer to the following Stackoverflow answer

JavaTechnical
  • 8,846
  • 8
  • 61
  • 97
  • Thank your reply. i also do not see this config in the code. but i have question is when i not add this parameter, why it throw exception saying: "The message is 1760923 bytes when serialized which is larger than the maximum request size you have configured with the **max.request.size** configuration.". where it mentions 'max.request.size'. – Jay Jun 04 '20 at 08:50
  • Thank your reply. – Jay Jun 04 '20 at 08:51
  • Which version of Kafka are you using? – JavaTechnical Jun 04 '20 at 09:59
  • version is: confluentinc/cp-kafka:5.2.1 – Jay Jun 05 '20 at 01:28
  • 1
    `max.request.size` is a producer config, not a topic config: https://kafka.apache.org/documentation/#max.request.size – Matthias J. Sax Jun 06 '20 at 20:36