0

I have researched different configs even from Stackoverflow, but really stucked with it for several days so created separate question for it. I am trying to configure Kafka to send large messages (10-50 mbytes). I run Kafka in Docker (version is confluentinc/cp-kafka:7.2.1). I also understand that Kafka is the not the best instrument for it. I am trying to config Kafka from Java the way below, and restarted my Kafka Docker instance, but still see the error message:

org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

And below is config which I use (from Google and Stackoverflow).

  1. Here are my Producer and Consumer and KafkaAdmin java classes: KafkaAdminConfig.java:

    @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configProps = new HashMap<>(); configProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put("max.message.bytes", String.valueOf(maxFileSize)); configProps.put("max.request.size", maxFileSize); configProps.put("replica.fetch.max.bytes", maxFileSize); configProps.put("message.max.bytes", maxFileSize); configProps.put("max.message.bytes", maxFileSize); configProps.put("max.message.max.bytes", maxFileSize); configProps.put("max.partition.fetch.bytes", maxFileSize); return new KafkaAdmin(configProps); }

ProducerConfig.java

@Bean
public ProducerFactory<String, Byte[]> producerFactoryLargeFiles() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

    //required to allow Kafka process files <= 20 mb
    configProps.put("buffer.memory", maxFileSize);
    configProps.put("max.request.size", maxFileSize);
    configProps.put("replica.fetch.max.bytes", maxFileSize);
    configProps.put("message.max.bytes", maxFileSize);
    configProps.put("max.message.bytes", maxFileSize);
    configProps.put("acks", "all");
    configProps.put("retries", 0);
    configProps.put("batch.size", 16384);
    configProps.put("linger.ms", 1);

    return new DefaultKafkaProducerFactory<>(configProps);
}
    

ConsumerConfig.java

@Bean
    public ConsumerFactory<String, String>  consumerFactoryLargeFiles() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        //required to allow Kafka process files <= 20 mb
        props.put("fetch.message.max.bytes", maxFileSize);

        return new DefaultKafkaConsumerFactory<>(props);
    }

maxFileSize is 104857600 - it is about 104Mb. And I am trying to send message about 3MB.

  1. I also added following env variables to my docker compose:

KAFKA_MAX_REQUEST_SIZE: 104857600 KAFKA_PRODUCER_MAX_REQUEST_SIZE: 104857600 CONNECT_PRODUCER_MAX_REQUEST_SIZE: 104857600

I will be happy to provide additional information or logs if need.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Aleksandra
  • 13
  • 3
  • Did you actually override the property set? In any case, this post had more details you can try: https://stackoverflow.com/questions/21020347/how-can-i-send-large-messages-with-kafka-over-15mb – Tim Aug 15 '22 at 10:26

2 Answers2

0

I suggest to upload file with other means and send file name via Kafka

sax
  • 73
  • 3
  • thank you. -I agree your approach is better. And we should follow it in production systems(or use another brokers not kafka for large message processing). But this is the learning task - to configure Kafka to be able to work with >10mb messages itself. – Aleksandra Aug 15 '22 at 10:44
  • This is more of a comment than answer – OneCricketeer Aug 15 '22 at 14:14
0

I resolved by myself - maybe this will be helpful for other people. I followed following manual by Baeldung - https://www.baeldung.com/java-kafka-send-large-message:

  1. Kafka topic config via java
  2. Kafka broker config via env variable cause I am using Docker:

KAFKA_MESSAGE_MAX_BYTES: 20971520

  1. Kafka Producer config via java
  2. Kafka Consumer config via java
Aleksandra
  • 13
  • 3