1

Below are my producer configuration , where if you see their is compression type as gzip , even though i mentioned the compression type why the message is not publishing and it is failing with

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, edi856KafkaConfig.getBootstrapServersConfig());
        props.put(ProducerConfig.RETRIES_CONFIG, edi856KafkaConfig.getRetriesConfig());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, edi856KafkaConfig.getBatchSizeConfig());
        props.put(ProducerConfig.LINGER_MS_CONFIG, edi856KafkaConfig.getIntegerMsConfig());
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, edi856KafkaConfig.getBufferMemoryConfig());
        ***props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");***
        props.put(Edi856KafkaProducerConstants.SSL_PROTOCOL, edi856KafkaConfig.getSslProtocol());
        props.put(Edi856KafkaProducerConstants.SECURITY_PROTOCOL, edi856KafkaConfig.getSecurityProtocol());
        props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_LOCATION, edi856KafkaConfig.getSslKeystoreLocation());
        props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_PASSWORD, edi856KafkaConfig.getSslKeystorePassword());
        props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_LOCATION, edi856KafkaConfig.getSslTruststoreLocation());
        props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_PASSWORD, edi856KafkaConfig.getSslTruststorePassword());
        **props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");** 

and error am getting is below

org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-12-07_12:34:10.037 [http-nio-8080-exec-1] ERROR c.tgt.trans.producer.Edi856Producer - Exception while writing mesage to topic= '{}'
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

and want consumer configuration we need to use of i want string representation of the kafka message on the consumer side

Bravo
  • 8,589
  • 14
  • 48
  • 85

2 Answers2

5

Unfortunately you're encountering a rather odd issue with the new Producer implementation in Kafka.

Although the messages size limit applied by Kafka at the broker level is applied to a single compressed record set (potentially multiple messages), the new producer currently applies the max.request.size limit on the record prior to any compression.

This has been captured in https://issues.apache.org/jira/browse/KAFKA-4169 (created 14/Sep/16 and unresolved at time of writing).

If you are certain that the compressed size of your message (plus any overhead of the record set) will be smaller than the broker's configured max.message.bytes, you may be able to get away with increasing the value of max.request.size property on your Producer without having to change any configuration on the broker. This would allow the Producer code to accept the size of the pre-compression payload where it would then be compressed and sent to the broker.

However it is important to note that should the Producer try to send a request that is too large for the configuration of the broker, the broker will reject the message and it will be up to your application to handle this correctly.

tKe
  • 560
  • 2
  • 9
  • Confirming that in kafka python creating the Producer as `KafkaProducer(compression_type='gzip', max_request_size=2 ** 20 * 10) # 10 MBs` gets the message through – Mr_and_Mrs_D Oct 07 '18 at 14:57
1

Just read the error message :)

The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration

The message is > 1 MByte that is the default value allowed by Apache Kafka. To allow large messages check the answers in How can I send large messages with Kafka (over 15MB)?