my use case is to read Kafka messages with structured streaming and use foreachBatch to push those messages into HBase by using some bulk Put to gain some performance over single Put, I am able to push messages using foreach (thanks to Spark Structured Streaming with Hbase integration) but not able to do the same for foreachBatch operation.
Can someone please help with this ? Attaching the code below.
KafkaStructured.scala :
package com.test
import java.math.BigInteger
import java.util
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
object KafkaStructured {
@JsonIgnoreProperties(ignoreUnknown = true)
case class Header(field1: String, field2: String, field3: String)
@JsonIgnoreProperties(ignoreUnknown = true)
case class Body(fieldx: String)
@JsonIgnoreProperties(ignoreUnknown = true)
case class Event(header: Header, body: Body)
@JsonIgnoreProperties(ignoreUnknown = true)
case class KafkaResp(event: Event)
@JsonIgnoreProperties(ignoreUnknown = true)
case class HBaseDF(field1: String, field2: String, field3: String)
def main(args: Array[String]): Unit = {
val jsonSchema = Encoders.product[KafkaResp].schema
val spark = SparkSession
.builder()
.appName("Kafka Spark")
.getOrCreate()
val df = spark
.readStream
.format("kafka")
.option...
.load()
import spark.sqlContext.implicits._
val flattenedDf: DataFrame =
df
.select($"value".cast("string").as("json"))
.select(from_json($"json", jsonSchema).as("data"))
.select("data.event.header.field1", "data.event.header.field2", "data.event.header.field3")
val hbaseDf = flattenedDf
.as[HBaseDF]
.filter(hbasedf => hbasedf != null && hbasedf.field1 != null)
flattenedDf
.writeStream
.option("truncate", "false")
.option("checkpointLocation", "some hdfs location")
.format("console")
.outputMode("append")
.start()
def bytes(data: String) = {
val bytes = data match {
case data if data != null && !data.isEmpty => Bytes.toBytes(data)
case _ => Bytes.toBytes("")
}
bytes
}
hbaseDf
.writeStream
.foreachBatch(function = (batchDf, batchId) => {
val putList = new util.ArrayList[Put]()
batchDf
.foreach(row => {
val p: Put = new Put(bytes(row.field1))
val cfName= bytes("fam1")
p.addColumn(cfName, bytes("field1"), bytes(row.field1))
p.addColumn(cfName, bytes("field2"), bytes(row.field2))
p.addColumn(cfName, bytes("field3"), bytes(row.field3))
putList.add(p)
})
new HBaseBulkForeachWriter[HBaseDF] {
override val tableName: String = "<my table name>"
override def bulkPut: util.ArrayList[Put] = {
putList
}
}
}
)
.start()
spark.streams.awaitAnyTermination()
}
}
HBaseBulkForeachWriter.scala :
package com.test
import java.util
import java.util.concurrent.ExecutorService
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.security.User
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.ForeachWriter
import scala.collection.mutable
trait HBaseBulkForeachWriter[RECORD] extends ForeachWriter[RECORD] {
val tableName: String
val hbaseConfResources: mutable.Seq[String] = mutable.Seq("location for core-site.xml", "location for hbase-site.xml")
def pool: Option[ExecutorService] = None
def user: Option[User] = None
private var hTable: Table = _
private var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
connection = createConnection()
hTable = getHTable(connection)
true
}
def createConnection(): Connection = {
val hbaseConfig = HBaseConfiguration.create()
hbaseConfResources.foreach(hbaseConfig.addResource)
ConnectionFactory.createConnection(hbaseConfig, pool.orNull, user.orNull)
}
def getHTable(connection: Connection): Table = {
connection.getTable(TableName.valueOf(tableName))
}
override def process(record: RECORD): Unit = {
val put = bulkPut
hTable.put(put)
}
override def close(errorOrNull: Throwable): Unit = {
hTable.close()
connection.close()
}
def bulkPut: util.ArrayList[Put]
}