I am not able to send dataframe as comma separated json object for larger data set .
Working code for smaller data set
df.selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value") \
.write.format("kafka")\
.option("compression", "gzip")\
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "JsonFormat") \
.option("kafka.request.timeout.ms", 120000) \
.option("kafka.linger.ms", 10) \
.option("compression", "gzip")\
.option("kafka.retries", 3) \
.save()
spark.stop()
output
[{
"firstname": "James",
"middlename": "",
"lastname": "Smith",
"id": "36636",
"gender": "M",
"salary": 3000
}, {
"firstname": "Michael",
"middlename": "Rose",
"lastname": "",
"id": "40288",
"gender": "M",
"salary": 4000
}, {
"firstname": "Robert",
"middlename": "",
"lastname": "Williams",
"id": "42114",
"gender": "M",
"salary": 4000
}, {
"firstname": "Maria",
"middlename": "Anne",
"lastname": "Jones",
"id": "39192",
"gender": "F",
"salary": 4000
}, {
"firstname": "Satish",
"middlename": "Anjaneyapp",
"lastname": "Brown",
"id": "",
"gender": "F",
"salary": -1
}]
Actual Problem
for larger data set - collect_list(to_json(struct(*))) AS STRING) - trying to collect huge data and sending through kafka . We are getting below error
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 51312082 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
Limitation :
I can send only one 1 mb per message through Kafka .
Is there a way , we can break the message upto 1 mb size and send the comma seperated json object .
Tried below configurations , but no luck
kafka.linger.ms
batch.size