0

I have the following simple kafka consumer that works with confluent kafka/schema and avro based topics. It works as expected with local[*]. However, once I try to submit this to a standalone cluster (spark 3.0.1) using the following command:

/spark/bin/spark-submit \
        --class ${SPARK_APPLICATION_MAIN_CLASS} \
        --master ${SPARK_MASTER_URL} \
        ${SPARK_SUBMIT_ARGS} \
        ${SPARK_APPLICATION_JAR_LOCATION} ${SPARK_APPLICATION_ARGS}

where

      - SPARK_MASTER_NAME=spark-master
      - SPARK_MASTER_PORT=7077
      - SPARK_APPLICATION_MAIN_CLASS=xxxx.AvroConsumer
      - SPARK_APPLICATION_ARGS=-m spark://spark-master:7077 -b http://ipaddr:9092 -s http://ipaddr:8081 -t topic
      - SPARK_SUBMIT_ARGS=--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1

I get the following error:

 Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 172.22.0.3, executor 0): org.apache.spark.SparkException: Failed to execute user defined function(AvroConsumer$$$Lambda$571/434329437: (binary) => string)

The build.sbt:

import _root_.sbt.Keys._
import _root_.sbt.Resolver
import _root_.sbt._

lazy val root = (project in file("."))
  .settings(
    name := "spark-examples",
    organization := "xxxx",
    version := "1.0",
    scalaVersion := "2.12.11",
    mainClass in Compile := Some("xxxx")
  )

libraryDependencies ++= {
  val spark = "org.apache.spark"
  val sparkV = "3.0.1"

  val confluent = "io.confluent"
  val confluentV = "6.0.0"

  Seq(
    spark %% "spark-core"                 % sparkV,
    spark %% "spark-sql"                  % sparkV,
    spark %% "spark-hive"                 % sparkV,
    spark %% "spark-streaming"            % sparkV,
    spark %% "spark-streaming-kafka-0-10" % sparkV,
    spark %% "spark-sql-kafka-0-10"       % sparkV,
    spark %% "spark-avro"                 % sparkV,
    spark %% "spark-mllib"                % sparkV,
    confluent % "kafka-avro-serializer"   % confluentV,
    confluent % "kafka-schema-registry-client" % confluentV,
    "org.apache.logging.log4j" % "log4j-api" % "2.14.0",
    "com.typesafe.play" %% "play-json" % "2.9.2",
    "com.fasterxml.jackson.core" % "jackson-databind" % "2.12.1",
    "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.12.1",
    "com.github.scopt" %% "scopt" % "4.0.0", //OptinsParser
    "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly()
  //    "org.scalatest" %% "scalatest" % "2.2.1" % "test",
//    "xxxx" %% "spark-testing-base" % "0.0.1" % "test"
  )
}

resolvers ++= Seq(
  "Confluent Repository" at "https://packages.confluent.io/maven/",
  "Mulesoft Repository" at "https://repository.mulesoft.org/nexus/content/repositories/public/"
)

assemblyMergeStrategy in assembly := {
  case "reference.conf" => MergeStrategy.concat
  case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
  case PathList("META-INF", xs@_*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}

And the Consumer code:

// https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry
// https://github.com/xebia-france/spark-structured-streaming-blog/blob/master/src/main/scala/AvroConsumer.scala#L28-L39
package xxxx

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

/**
 * Consumer example with Spark dataframes and Confluent Kafka.
 */
object AvroConsumer {
  private var schemaRegistryClient: SchemaRegistryClient = _
  private var kafkaAvroDeserializer: AvroDeserializer = _

  def lookupTopicSchema(topic: String, isKey: Boolean = false): String = {
    schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
  }

  def avroSchemaToSparkSchema(avroSchema: String): SchemaConverters.SchemaType = {
    SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
  }

  def main(args: Array[String]): Unit = {
    val cmd: CommandLine = parseArg(args)
    val master = cmd.getOptionValue("master", "local[*]")
    val spark = SparkSession.builder()
      .appName(AvroConsumer.getClass.getName)
      .master(master)
      .getOrCreate()
    val bootstrapServers = cmd.getOptionValue("bootstrap-server")
    val topic = cmd.getOptionValue("topic")
    val schemaRegistryUrl = cmd.getOptionValue("schema-registry")

    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)

    spark.sparkContext.setLogLevel("ERROR")

    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      DeserializerWrapper.deserializer.deserialize(bytes)
    )

    val kafkaDataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
//      .option("startingOffsets", "earliest")
      .option("startingOffsets", "latest")
      .load()
    val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")

    import org.apache.spark.sql.functions._

    val dfValueSchema = {
      val rawSchema = lookupTopicSchema(topic)
      avroSchemaToSparkSchema(rawSchema)
    }
    val formattedDataFrame = valueDataFrame.select(
      from_json(col("message"), dfValueSchema.dataType).alias("parsed_value"))
      .select("parsed_value.*")

    formattedDataFrame
      .writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("truncate", false)
      .start()
      .awaitTermination()
  }

  object DeserializerWrapper {
    val deserializer: AvroDeserializer = kafkaAvroDeserializer
  }

  class AvroDeserializer extends AbstractKafkaAvroDeserializer {
    def this(client: SchemaRegistryClient) {
      this()
      this.schemaRegistry = client
    }

    override def deserialize(bytes: Array[Byte]): String = {
      val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
      genericRecord.toString
    }
  }

  private def parseArg(args: Array[String]): CommandLine = {
    import org.apache.commons.cli._

    val options = new Options
    val masterOption = new Option("m", "master", true, "Spark master")
    masterOption.setRequired(false)
    options.addOption(masterOption)
    val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
    bootstrapOption.setRequired(true)
    options.addOption(bootstrapOption)
    val topicOption = new Option("t", "topic", true, "Kafka topic")
    topicOption.setRequired(true)
    options.addOption(topicOption)
    val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
    schemaRegOption.setRequired(true)
    options.addOption(schemaRegOption)
    val parser = new BasicParser
    parser.parse(options, args)
  }

}

What exactly is causing this error? Considering that the same code runs locally without an error.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Jesse Quinn
  • 169
  • 2
  • 6
  • Are you using the same input data when running in local mode and when running on the standalone cluster? – Michael Heil Jan 26 '21 at 19:33
  • Can you get more of the stacktrace? The error suggests that `AvroDeserializer.deserialize` is not serializable across Spark workers (and it seems you copied my answer, where I did not test that code outside of a local master) – OneCricketeer Jan 26 '21 at 20:15
  • 1
    @mike, same topic, same kafka server etc. literally the only change is from local[*] to standalone cluster spark://server:7077 – Jesse Quinn Jan 27 '21 at 20:39
  • 1
    @OneCricketeer yes most of the code is from your example with some modifications. I will try to get a stack trace but i have actually moved to using the "za.co.absa" %% "abris" % "4.0.1", and scrapping the existing code. – Jesse Quinn Jan 27 '21 at 20:40

0 Answers0