1

Building off this question, how would one write all the columns of a dataframe to a kafka topic.

Currently I have a dataframe with some columns, I am supposed to write this to kafka with a key, therefore I create a new dataframe from the old one and specify the key and value:

val endDf: DataFrame = midDf.withColumn("key",lit(keyval)).withColumn("value",lit(testVal))

Now when I write this to kafka I specify:

endDf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "test:8808")
  .option("topic", "topic1")
  .save()

This works if value is a single column. But the initial dataframe is made up of multiple columns and I need to write all of these in JSON format.

How would I write all the columns as the value. I feel it revolves around amalgamation using interDf.columns and to_json

Giorgos Myrianthous
  • 36,235
  • 20
  • 134
  • 156
user2883071
  • 960
  • 1
  • 20
  • 50

1 Answers1

1

Kafka expects a key and a value; Therefore, you have to aggregate all remaining columns (i.e. except the key column), into a single value using to_json():

import org.apache.spark.sql.functions._

val value_col_names = endDf.columns.filter(_ != "yourKeyColumn") 

endDf.withColumnRenamed("yourKeyColumn", "key") \ 
     .withColumn("value", to_json(struct(value_col_names.map(col(_)):_*))) \
     .select("key", "value") \ 
     .write() \ 
     .format("kafka") \ 
     .option("kafka.bootstrap.servers", "test:8808") \ 
     .option("topic", "topic1") \ 
     .save()
Giorgos Myrianthous
  • 36,235
  • 20
  • 134
  • 156