1

I am reading stream of data from kafka topic using strucured streaming with Update Mode., and then doing some transformation.

Then I have created a jdbc sink to push the data in mysql sink with Append mode. The problem is how do I tell my sink to let it know that this is my primary key and do the update based on it so that my table should not have any duplicate rows.

   val df: DataFrame = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<List-here>")
  .option("subscribe", "emp-topic")
  .load()


  import spark.implicits._
  // value in kafka is bytes so cast it to String
  val empList: Dataset[Employee] = df.
  selectExpr("CAST(value AS STRING)")
  .map(row => Employee(row.getString(0)))

  // window aggregations on 1 min windows
  val aggregatedDf= ......

  // How to tell here that id is my primary key and do the update
  // based on id column
  aggregatedDf
  .writeStream
  .trigger(Trigger.ProcessingTime(60.seconds))
  .outputMode(OutputMode.Update)
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      batchDF
      .select("id", "name","salary","dept")
      .write.format("jdbc")
      .option("url", "jdbc:mysql://localhost/empDb")
      .option("driver","com.mysql.cj.jdbc.Driver")
      .option("dbtable", "empDf")
      .option("user", "root")
      .option("password", "root")
      .mode(SaveMode.Append)
      .save()
     }
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
thedevd
  • 683
  • 11
  • 26

1 Answers1

3

One way is, you can use ON DUPLICATE KEY UPDATE with foreachPartition may serve this purpose

Below is the psuedo code snippet

/**
    * Insert in to database using foreach partition.
    * @param dataframe : DataFrame
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(dataframe: DataFrame, sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

//numPartitions = number of simultaneous DB connections you can planning to give
datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")
    dataFrame.foreachPartition { partition =>
      // Note : Each partition one connection (more better way is to use connection pools)
      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
      partition.grouped(1000).foreach {
        group =>
          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
          group.foreach {
            record => insertString.append("('" + record.mkString(",") + "'),")
          }

val sql =   s"""
               | INSERT INTO $sqlTableName  VALUES  
               | $tableHeader
               | ${insertString}
               | ON DUPLICATE KEY UPDATE 
               | yourprimarykeycolumn='${record.getAs[String]("key")}'
    sqlExecutorConnection.createStatement()
                .executeUpdate(sql)
          }
    sqlExecutorConnection.close() // close the connection
        }
      }

you can use preparedstatement instead of jdbc statement.

Further reading : SPARK SQL - update MySql table using DataFrames and JDBC

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • thanks for the answer. But that question you referred seems 3 years ago, so I am wondering if there is any other feature has to be in the current version of spark 2.4.0. – thedevd May 03 '19 at 05:39
  • above approach will work with current version of spark as well since its RDD level – Ram Ghadiyaram May 03 '19 at 14:28
  • As of now I am marking the answer as accepted, as there is no other good alternative I found in spark. – thedevd Jun 12 '19 at 06:51