I have the following simple kafka consumer that works with confluent kafka/schema and avro based topics. It works as expected with local[*]. However, once I try to submit this to a standalone cluster (spark 3.0.1) using the following command:
/spark/bin/spark-submit \
--class ${SPARK_APPLICATION_MAIN_CLASS} \
--master ${SPARK_MASTER_URL} \
${SPARK_SUBMIT_ARGS} \
${SPARK_APPLICATION_JAR_LOCATION} ${SPARK_APPLICATION_ARGS}
where
- SPARK_MASTER_NAME=spark-master
- SPARK_MASTER_PORT=7077
- SPARK_APPLICATION_MAIN_CLASS=xxxx.AvroConsumer
- SPARK_APPLICATION_ARGS=-m spark://spark-master:7077 -b http://ipaddr:9092 -s http://ipaddr:8081 -t topic
- SPARK_SUBMIT_ARGS=--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
I get the following error:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 172.22.0.3, executor 0): org.apache.spark.SparkException: Failed to execute user defined function(AvroConsumer$$$Lambda$571/434329437: (binary) => string)
The build.sbt
:
import _root_.sbt.Keys._
import _root_.sbt.Resolver
import _root_.sbt._
lazy val root = (project in file("."))
.settings(
name := "spark-examples",
organization := "xxxx",
version := "1.0",
scalaVersion := "2.12.11",
mainClass in Compile := Some("xxxx")
)
libraryDependencies ++= {
val spark = "org.apache.spark"
val sparkV = "3.0.1"
val confluent = "io.confluent"
val confluentV = "6.0.0"
Seq(
spark %% "spark-core" % sparkV,
spark %% "spark-sql" % sparkV,
spark %% "spark-hive" % sparkV,
spark %% "spark-streaming" % sparkV,
spark %% "spark-streaming-kafka-0-10" % sparkV,
spark %% "spark-sql-kafka-0-10" % sparkV,
spark %% "spark-avro" % sparkV,
spark %% "spark-mllib" % sparkV,
confluent % "kafka-avro-serializer" % confluentV,
confluent % "kafka-schema-registry-client" % confluentV,
"org.apache.logging.log4j" % "log4j-api" % "2.14.0",
"com.typesafe.play" %% "play-json" % "2.9.2",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.12.1",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.12.1",
"com.github.scopt" %% "scopt" % "4.0.0", //OptinsParser
"com.github.fommil.netlib" % "all" % "1.1.2" pomOnly()
// "org.scalatest" %% "scalatest" % "2.2.1" % "test",
// "xxxx" %% "spark-testing-base" % "0.0.1" % "test"
)
}
resolvers ++= Seq(
"Confluent Repository" at "https://packages.confluent.io/maven/",
"Mulesoft Repository" at "https://repository.mulesoft.org/nexus/content/repositories/public/"
)
assemblyMergeStrategy in assembly := {
case "reference.conf" => MergeStrategy.concat
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case _ => MergeStrategy.first
}
And the Consumer code:
// https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry
// https://github.com/xebia-france/spark-structured-streaming-blog/blob/master/src/main/scala/AvroConsumer.scala#L28-L39
package xxxx
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode
/**
* Consumer example with Spark dataframes and Confluent Kafka.
*/
object AvroConsumer {
private var schemaRegistryClient: SchemaRegistryClient = _
private var kafkaAvroDeserializer: AvroDeserializer = _
def lookupTopicSchema(topic: String, isKey: Boolean = false): String = {
schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
}
def avroSchemaToSparkSchema(avroSchema: String): SchemaConverters.SchemaType = {
SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
}
def main(args: Array[String]): Unit = {
val cmd: CommandLine = parseArg(args)
val master = cmd.getOptionValue("master", "local[*]")
val spark = SparkSession.builder()
.appName(AvroConsumer.getClass.getName)
.master(master)
.getOrCreate()
val bootstrapServers = cmd.getOptionValue("bootstrap-server")
val topic = cmd.getOptionValue("topic")
val schemaRegistryUrl = cmd.getOptionValue("schema-registry")
schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
spark.sparkContext.setLogLevel("ERROR")
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
DeserializerWrapper.deserializer.deserialize(bytes)
)
val kafkaDataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
// .option("startingOffsets", "earliest")
.option("startingOffsets", "latest")
.load()
val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")
import org.apache.spark.sql.functions._
val dfValueSchema = {
val rawSchema = lookupTopicSchema(topic)
avroSchemaToSparkSchema(rawSchema)
}
val formattedDataFrame = valueDataFrame.select(
from_json(col("message"), dfValueSchema.dataType).alias("parsed_value"))
.select("parsed_value.*")
formattedDataFrame
.writeStream
.format("console")
.outputMode(OutputMode.Append())
.option("truncate", false)
.start()
.awaitTermination()
}
object DeserializerWrapper {
val deserializer: AvroDeserializer = kafkaAvroDeserializer
}
class AvroDeserializer extends AbstractKafkaAvroDeserializer {
def this(client: SchemaRegistryClient) {
this()
this.schemaRegistry = client
}
override def deserialize(bytes: Array[Byte]): String = {
val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
genericRecord.toString
}
}
private def parseArg(args: Array[String]): CommandLine = {
import org.apache.commons.cli._
val options = new Options
val masterOption = new Option("m", "master", true, "Spark master")
masterOption.setRequired(false)
options.addOption(masterOption)
val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
bootstrapOption.setRequired(true)
options.addOption(bootstrapOption)
val topicOption = new Option("t", "topic", true, "Kafka topic")
topicOption.setRequired(true)
options.addOption(topicOption)
val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
schemaRegOption.setRequired(true)
options.addOption(schemaRegOption)
val parser = new BasicParser
parser.parse(options, args)
}
}
What exactly is causing this error? Considering that the same code runs locally without an error.