I am trying to read a kafka stream and save it to Hive as a table.
The consumer code is :
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.streaming.Trigger
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
object testH {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("MyApp")
.master("local")
.enableHiveSupport()
.getOrCreate()
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my-group-id",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("my-topic")
val schema = new StructType()
.add("id", IntegerType)
.add("name", StringType)
.add("age", IntegerType)
// Read from Kafka topic
val stream: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)")
// Parse JSON data and write to Hive table
val query = stream
.select(from_json(col("value"), schema).as("data"))
.selectExpr("data.id", "data.name", "data.age")
.writeStream.
foreachBatch {(batchDF: DataFrame, batchId: Long) =>
batchDF
.write
.format("hive")
.mode(SaveMode.Append)
.saveAsTable("test");
}.start()
query.awaitTermination()
}
}
my build.sbt :
ThisBuild / version := "0.1.0-SNAPSHOT"
name := "kafka-spark-hive"
version := "1.0"
scalaVersion := "2.12.15"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-hive" % "3.3.2",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.3.2",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2",
"org.apache.spark" %% "spark-sql" % "3.3.2",
"org.apache.spark" %% "spark-streaming" % "3.3.2",
"org.apache.kafka" % "kafka-clients" % "3.4.0",
"com.typesafe" % "config" % "1.4.2",
"org.apache.hive" % "hive-exec" % "3.1.3" ,
"org.apache.hive" % "hive-metastore" % "3.1.3" ,
"org.apache.hive" % "hive-common" % "3.1.3" ,
"org.apache.hadoop" % "hadoop-common" % "3.3.2" ,
"org.apache.hadoop" % "hadoop-hdfs" % "3.3.2",
"org.apache.hadoop" % "hadoop-auth" % "3.3.2"
)
I get that error : java.lang.NoSuchMethodError: org.apache.hadoop.hive.common.FileUtils.mkdir(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;ZLorg/apache/hadoop/conf/Configuration;)Z
I tried downgrading and upgrading some dependencies, but I don't know which one caused the issue.