0

I am not able to send dataframe as comma separated json object for larger data set .

Working code for smaller data set

    df.selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value") \
        .write.format("kafka")\
        .option("compression", "gzip")\
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "JsonFormat") \
        .option("kafka.request.timeout.ms", 120000) \
        .option("kafka.linger.ms", 10) \
        .option("compression", "gzip")\
        .option("kafka.retries", 3) \
        .save()
    spark.stop()

output

[{
    "firstname": "James",
    "middlename": "",
    "lastname": "Smith",
    "id": "36636",
    "gender": "M",
    "salary": 3000
}, {
    "firstname": "Michael",
    "middlename": "Rose",
    "lastname": "",
    "id": "40288",
    "gender": "M",
    "salary": 4000
}, {
    "firstname": "Robert",
    "middlename": "",
    "lastname": "Williams",
    "id": "42114",
    "gender": "M",
    "salary": 4000
}, {
    "firstname": "Maria",
    "middlename": "Anne",
    "lastname": "Jones",
    "id": "39192",
    "gender": "F",
    "salary": 4000
}, {
    "firstname": "Satish",
    "middlename": "Anjaneyapp",
    "lastname": "Brown",
    "id": "",
    "gender": "F",
    "salary": -1
}]

Actual Problem

for larger data set - collect_list(to_json(struct(*))) AS STRING) - trying to collect huge data and sending through kafka . We are getting below error

Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 51312082 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

Limitation :

I can send only one 1 mb per message through Kafka .

Is there a way , we can break the message upto 1 mb size and send the comma seperated json object .

Tried below configurations , but no luck

kafka.linger.ms

batch.size

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Joe
  • 47
  • 7

1 Answers1

1

Don't comma separate your JSON objects. Then the records won't be valid JSON. You also shouldn't break into "1MB chunks", because then you'll have incomplete strings being sent to different partitions, and you have no easy way to detemine ordering to put them together in any consumer.

Remove the collect_list call and instead ensure your dataframe has a values string column of valid individual JSON objects as multiple rows. Then the Kafka writer will write each row as a new message

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • i have reposted the output . Its a valid json .. I need this for larger dataset .. basically we need to append comma after each message – Joe May 20 '22 at 17:57
  • Why does every JSON object need to be part of the _same message_? `collect_list` already puts a comma after each message **but** also puts `[]` around that data to make it a proper JSON array. You **cannot** have data like `{"k":"v"},{"k":"v"}`; that cannot be parsed using standard JSON parsers in Kafka. Your only other option is to [change the allowed message size beyond 1MB](https://stackoverflow.com/questions/21020347/how-can-i-send-large-messages-with-kafka-over-15mb) – OneCricketeer May 20 '22 at 18:50
  • We are able to achieve by using collect_list [{},{}] , but it is not working for larger dataset . i am looking for a solution to limit the size to 1mb .. our environment wont support more than 1 mb . – Joe May 20 '22 at 21:42
  • I said `{},{}` wouldn't work (without `collect_list`), not `[{},{}]` which will work... If you aren't willing to send each JSON object as individual records rather than a list, or change your broker settings, I'm not sure what else to tell you. That's just Kafka's limitations. – OneCricketeer May 20 '22 at 21:45