I have a very large pyspark data frame. I need to convert the dataframe into a JSON formatted string for each row then publish the string to a Kafka topic. I originally used the following code.
for message in df.toJSON().collect():
kafkaClient.send(message)
However the dataframe is very large so it fails when trying to collect()
.
I was thinking of using a UDF
since it processes it row by row.
from pyspark.sql.functions import udf, struct
def get_row(row):
json = row.toJSON()
kafkaClient.send(message)
return "Sent"
send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()
But I am getting an error because the column is inputed to the function and not the row.
For illustrative purposes, we can use the df below where we can assume Col1 and Col2 must be send over.
df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])
The JSON string for each row:
'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'