I am reading stream of data from kafka topic using strucured streaming with Update Mode., and then doing some transformation.
Then I have created a jdbc sink to push the data in mysql sink with Append mode. The problem is how do I tell my sink to let it know that this is my primary key and do the update based on it so that my table should not have any duplicate rows.
val df: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<List-here>")
.option("subscribe", "emp-topic")
.load()
import spark.implicits._
// value in kafka is bytes so cast it to String
val empList: Dataset[Employee] = df.
selectExpr("CAST(value AS STRING)")
.map(row => Employee(row.getString(0)))
// window aggregations on 1 min windows
val aggregatedDf= ......
// How to tell here that id is my primary key and do the update
// based on id column
aggregatedDf
.writeStream
.trigger(Trigger.ProcessingTime(60.seconds))
.outputMode(OutputMode.Update)
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF
.select("id", "name","salary","dept")
.write.format("jdbc")
.option("url", "jdbc:mysql://localhost/empDb")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("dbtable", "empDf")
.option("user", "root")
.option("password", "root")
.mode(SaveMode.Append)
.save()
}