0

my use case is to read Kafka messages with structured streaming and use foreachBatch to push those messages into HBase by using some bulk Put to gain some performance over single Put, I am able to push messages using foreach (thanks to Spark Structured Streaming with Hbase integration) but not able to do the same for foreachBatch operation.

Can someone please help with this ? Attaching the code below.

KafkaStructured.scala :


package com.test

import java.math.BigInteger
import java.util

import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql._
import org.apache.spark.sql.functions._


object KafkaStructured {

  @JsonIgnoreProperties(ignoreUnknown = true)
  case class Header(field1: String, field2: String, field3: String)

  @JsonIgnoreProperties(ignoreUnknown = true)
  case class Body(fieldx: String)

  @JsonIgnoreProperties(ignoreUnknown = true)
  case class Event(header: Header, body: Body)

  @JsonIgnoreProperties(ignoreUnknown = true)
  case class KafkaResp(event: Event)

  @JsonIgnoreProperties(ignoreUnknown = true)
  case class HBaseDF(field1: String, field2: String, field3: String)


  def main(args: Array[String]): Unit = {

    val jsonSchema = Encoders.product[KafkaResp].schema

    val spark = SparkSession
      .builder()
      .appName("Kafka Spark")
      .getOrCreate()

    val df = spark
      .readStream
      .format("kafka")
      .option...
      .load()

    import spark.sqlContext.implicits._

    val flattenedDf: DataFrame =
      df
        .select($"value".cast("string").as("json"))
        .select(from_json($"json", jsonSchema).as("data"))
        .select("data.event.header.field1", "data.event.header.field2", "data.event.header.field3")

    val hbaseDf = flattenedDf
      .as[HBaseDF]
      .filter(hbasedf => hbasedf != null && hbasedf.field1 != null)

    flattenedDf
      .writeStream
      .option("truncate", "false")
      .option("checkpointLocation", "some hdfs location")
      .format("console")
      .outputMode("append")
      .start()

    def bytes(data: String) = {
      val bytes = data match {
        case data if data != null && !data.isEmpty => Bytes.toBytes(data)
        case _ => Bytes.toBytes("")
      }
      bytes
    }

   
    hbaseDf
      .writeStream
      .foreachBatch(function = (batchDf, batchId) => {
        val putList = new util.ArrayList[Put]()
        batchDf
          .foreach(row => {
            val p: Put = new Put(bytes(row.field1))
            val cfName= bytes("fam1")
            p.addColumn(cfName, bytes("field1"), bytes(row.field1))
            p.addColumn(cfName, bytes("field2"), bytes(row.field2))
            p.addColumn(cfName, bytes("field3"), bytes(row.field3))
            putList.add(p)
          })
        new HBaseBulkForeachWriter[HBaseDF] {
          override val tableName: String = "<my table name>"
        
          override def bulkPut: util.ArrayList[Put] = {
            putList
          }
        }
      }
      )
      .start()

    spark.streams.awaitAnyTermination()
  }

}

HBaseBulkForeachWriter.scala :


package com.test

import java.util
import java.util.concurrent.ExecutorService

import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.security.User
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.ForeachWriter

import scala.collection.mutable

trait HBaseBulkForeachWriter[RECORD] extends ForeachWriter[RECORD] {

  val tableName: String
  val hbaseConfResources: mutable.Seq[String] = mutable.Seq("location for core-site.xml", "location for hbase-site.xml")

  def pool: Option[ExecutorService] = None

  def user: Option[User] = None

  private var hTable: Table = _
  private var connection: Connection = _

  override def open(partitionId: Long, version: Long): Boolean = {
    connection = createConnection()
    hTable = getHTable(connection)
    true
  }

  def createConnection(): Connection = {
    val hbaseConfig = HBaseConfiguration.create()
    hbaseConfResources.foreach(hbaseConfig.addResource)
    ConnectionFactory.createConnection(hbaseConfig, pool.orNull, user.orNull)
  }

  def getHTable(connection: Connection): Table = {
    connection.getTable(TableName.valueOf(tableName))
  }

  override def process(record: RECORD): Unit = {
    val put = bulkPut
    hTable.put(put)
  }

  override def close(errorOrNull: Throwable): Unit = {
    hTable.close()
    connection.close()
  }

  def bulkPut: util.ArrayList[Put]
}
p2s
  • 1
  • 2

1 Answers1

1

foreachBatch allow you to use foreachPartition inside the function. The code executed inside a foreachPartition only runs once per executor.

So you can create a function to create a put:

def putValue(key: String, columnName: String, data: Array[Byte]): Put = {
    val put = new Put(Bytes.toBytes(key))
    put.addColumn(Bytes.toBytes("colFamily"), Bytes.toBytes(columnName), data)
  }

Then a function to bulk insert the puts

def writePutList(putList: List[Put]): Unit = {
    val config: Configuration = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", zookeperUrl)

    val connection: Connection = ConnectionFactory.createConnection(config)
    val table = connection.getTable(TableName.valueOf(tableName))
    table.put(putList.asJava)
    logger.info("INSERT record[s] " + putList.size + " to table " + tableName + " OK.")
    table.close()
    connection.close()
  }
   

And use them inside a foreachPartition and a map

 def writeFunction: (DataFrame, Long) => Unit = {
    (batchData, id) => {
      batchData.foreachPartition(
        partition => {  
          val putList = partition.map(
            data =>
             putValue(data.getAs[String]("keyField"), "colName", Bytes.toBytes(data.getAs[String]("valueField")))
          ).toList
         writePutList(putList)
        }
      )
    }
  }

And finally use the function created in your streaming query:

 df.writeStream
      .queryName("yourQueryName")
      .option("checkpointLocation", checkpointLocation)
      .outputMode(OutputMode.Update())
      .foreachBatch(writeFunction)
      .start()
      .awaitTermination()
jotarada
  • 21
  • 2