I have researched different configs even from Stackoverflow, but really stucked with it for several days so created separate question for it. I am trying to configure Kafka to send large messages (10-50 mbytes). I run Kafka in Docker (version is confluentinc/cp-kafka:7.2.1). I also understand that Kafka is the not the best instrument for it. I am trying to config Kafka from Java the way below, and restarted my Kafka Docker instance, but still see the error message:
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
And below is config which I use (from Google and Stackoverflow).
Here are my Producer and Consumer and KafkaAdmin java classes: KafkaAdminConfig.java:
@Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configProps = new HashMap<>(); configProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put("max.message.bytes", String.valueOf(maxFileSize)); configProps.put("max.request.size", maxFileSize); configProps.put("replica.fetch.max.bytes", maxFileSize); configProps.put("message.max.bytes", maxFileSize); configProps.put("max.message.bytes", maxFileSize); configProps.put("max.message.max.bytes", maxFileSize); configProps.put("max.partition.fetch.bytes", maxFileSize); return new KafkaAdmin(configProps); }
ProducerConfig.java
@Bean
public ProducerFactory<String, Byte[]> producerFactoryLargeFiles() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
//required to allow Kafka process files <= 20 mb
configProps.put("buffer.memory", maxFileSize);
configProps.put("max.request.size", maxFileSize);
configProps.put("replica.fetch.max.bytes", maxFileSize);
configProps.put("message.max.bytes", maxFileSize);
configProps.put("max.message.bytes", maxFileSize);
configProps.put("acks", "all");
configProps.put("retries", 0);
configProps.put("batch.size", 16384);
configProps.put("linger.ms", 1);
return new DefaultKafkaProducerFactory<>(configProps);
}
ConsumerConfig.java
@Bean
public ConsumerFactory<String, String> consumerFactoryLargeFiles() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//required to allow Kafka process files <= 20 mb
props.put("fetch.message.max.bytes", maxFileSize);
return new DefaultKafkaConsumerFactory<>(props);
}
maxFileSize is 104857600 - it is about 104Mb. And I am trying to send message about 3MB.
- I also added following env variables to my docker compose:
KAFKA_MAX_REQUEST_SIZE: 104857600 KAFKA_PRODUCER_MAX_REQUEST_SIZE: 104857600 CONNECT_PRODUCER_MAX_REQUEST_SIZE: 104857600
I will be happy to provide additional information or logs if need.