0

I am writing a Dataframe with 30000 entries into kafka using the below params

    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_brokers)
    .option("kafka.compression.type","lz4")
    .option("kafka.max.request.size", 1048576)
    .option("kafka.message.max.bytes", 750000)
    .option("kafka.max.request.size",750000)
    .option("kafka.max.partition.fetch.bytes",750000)
    .option("kafka.batch.size", 100)
    .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    .option("value.serializer", "org.apache.kafka.common.serialization.JsonSerializer")
    .option("topic", product_kafka_topic)
    .option("partition",15)

I am not getting is how Spark is partitioning the Dataframe, I get the error

org.apache.kafka.common.errors.RecordTooLargeException: The message is 10540452 bytes when serialized which is larger than 750000, which is the value of the max.request.size configuration.

How can I solve it?

Sandeep540
  • 897
  • 3
  • 13
  • 38

1 Answers1

1

The message is 10540452 bytes when serialized which is larger than 750000

You have explicitly capped the message size

.option("kafka.message.max.bytes", 750000)

I am not getting is how Spark is partitioning the Dataframe

It takes your columns of key, value and optionally timestamp and partition, then packages each row into Kafka records, which are produced in request batches.

.option("kafka.max.request.size", 1048576)
.option("kafka.max.request.size", 750000)

Given that you repeated the option with a second value, that is the value that'll be used in the config

More than one message can be in a request, but this value is bytes, not number of records.

.option("kafka.batch.size", 100)

This isn't something that can be fixed in Spark alone; the broker will also deny large messages -- See solutions here How can I send large messages with Kafka (over 15MB)?


Aside: Structured Streaming does not use key.serializer or value.serializer, or partition as configuration options

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • I cannot change kafka, not in my control I put .repartition(2500, col("pbpdID"),col("productName"),col("seasonName"), col("createdDate") ) it partitioned the data, but 2% of data is still problematic, guess need to filter – Sandeep540 Jan 29 '21 at 18:06
  • When you write the data to the topic, it only accepts those columns that I listed, not other names. – OneCricketeer Jan 29 '21 at 18:08