0

I am trying to do an use case with Kafka and Spark using Scala. I built a consumer and a producer using kafka libs and now I am building the data processor to count words using Spark. Here are my build.sbt:

name := """scala-akka-stream-kafka"""

version := "1.0"

// scalaVersion := "2.12.4"
scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "0.10.2.0",
  "org.apache.kafka" % "kafka-streams" % "0.10.2.0",
  "org.apache.spark" %% "spark-core" % "2.2.0",
  "org.apache.spark" %% "spark-streaming" % "2.2.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.0.0")

dependencyOverrides ++= Seq(
  "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.5",
  "com.fasterxml.jackson.core" % "jackson-module-scala" % "2.6.5")

resolvers ++= Seq(
 "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
)

resolvers += Resolver.sonatypeRepo("releases")

My word count data processor is with some error on the line val wordMap = words.map( word => (word, 1)):

package com.spark.streams

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Durations, StreamingContext}

import scala.collection.mutable

object WordCountSparkStream extends App {

  val kafkaParam = new mutable.HashMap[String, String]()
  kafkaParam.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  kafkaParam.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  kafkaParam.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  kafkaParam.put(ConsumerConfig.GROUP_ID_CONFIG, "group1")
  kafkaParam.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
  kafkaParam.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")

  val conf = new SparkConf().setMaster("local[2]").setAppName("WordCountSparkStream")
  // Read messages in batch of 5 seconds
  val sparkStreamingContext = new StreamingContext(conf, Durations.seconds(5))

  //Configure Spark to listen messages in topic test
  val topicList = List("streams-plaintext-input")

  // Read value of each message from Kafka and return it
  val messageStream = KafkaUtils.createDirectStream(sparkStreamingContext,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topicList, kafkaParam))
  val lines = messageStream.map(consumerRecord => consumerRecord.value().asInstanceOf[String])

  // Break every message into words and return list of words
  val words = lines.flatMap(_.split(" "))

  // Take every word and return Tuple with (word,1)
  val wordMap = words.map( word => (word, 1))

  // Count occurance of each word
  val wordCount = wordMap.reduceByKey((first, second) => first + second)

  //Print the word count
  wordCount.print()

  sparkStreamingContext.start()
  sparkStreamingContext.awaitTermination()

  // "streams-wordcount-output"
}

But this is not compilation error. not even lib conflict. It says I cannot deserialize. But I am using String deserializer that is what my producing is generating.

17/12/12 17:02:50 INFO DAGScheduler: Submitting 8 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCountSparkStream.scala:37) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
17/12/12 17:02:50 INFO TaskSchedulerImpl: Adding task set 0.0 with 8 tasks
17/12/12 17:02:50 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 4710 bytes)
17/12/12 17:02:50 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4710 bytes)
17/12/12 17:02:50 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/12/12 17:02:50 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/12/12 17:02:50 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.ClassNotFoundException: scala.None$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1863)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1746)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2037)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:309)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Felipe
  • 7,013
  • 8
  • 44
  • 102
  • Don't see anything obvious that would lead to the error so...can you clean up your `build.sbt` a bit that may help us with the issue? Remove all `"org.apache.kafka"` and `"spark-core"` dependencies + `spark-streaming-kafka-0-10` dependency should be `2.2.0`. BTW, Why don't you use Spark Structured Streaming --> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html?! – Jacek Laskowski Dec 12 '17 at 16:54
  • I removed `"org.apache.spark" %% "spark-core" % "2.2.0"` but not `"org.apache.kafka" %% "kafka" % "0.10.2.0"` because i am using it on my producer and consumer. I also change the version of the spark-streaming-kafka to 2.2.0. The error remains the same. the end of the exception says `Caused by: org.apache.spark.SparkException: Could not find HeartbeatReceiver`. I guess I am going to read this tutorial you send to me. – Felipe Dec 12 '17 at 17:32
  • How do you run the application? Can you include the entire stacktrace in your question? – Jacek Laskowski Dec 12 '17 at 18:55
  • 1
    sure. I opened an issue to myself at https://github.com/felipegutierrez/scala-akka-stream-kafka/issues/1 – Felipe Dec 12 '17 at 21:03
  • Can you assemble your Spark application and run using `spark-submit`? I suspect the latest version of sbt 1.0.x may cause the issue (seen two other questions on SO with a very similar exception). – Jacek Laskowski Dec 13 '17 at 17:42
  • I've never used spark-submit. I am looking for some good quick start to learn how to use spark-submit. – Felipe Dec 13 '17 at 22:19
  • Ok, found a workaround -- downgrade sbt and use sbt < 1.0, i.e. sbt 0.13.6. That is supposed to put the issue aside until it's fixed. – Jacek Laskowski Dec 14 '17 at 07:05
  • How do you use `dependencyOverrides` for `sbt -sbt-version 0.13.6`? I am getting an error: `error: eof expected but ';' found.` when I use `dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-module-scala" % "2.6.5"` – Felipe Dec 14 '17 at 11:15
  • ok, this was stupid. It was needed a blank line between the scala expressions> https://stackoverflow.com/questions/18432683/error-eof-expected-how-to-use-idea-and-eclipse-plugins-together-in-sbt I am trying to run with sbt 0.13.6 like you said – Felipe Dec 14 '17 at 11:19
  • still the same error. I am going to spend time learning about `spark-submit` to debug the app. – Felipe Dec 14 '17 at 11:22
  • still the same error. According to the error, my spark streaming `Could not deserialize TaskEndReason: ClassNotFound with classloader ClasspathFilter(...` – Felipe Dec 14 '17 at 11:49
  • Let's switch to github and be back after we figured out what's wrong. More space to talk. – Jacek Laskowski Dec 14 '17 at 13:06

1 Answers1

-1

try this:

fork:=true

works for me, but i don't know how~

Hans Olsson
  • 11,123
  • 15
  • 38
Hall Wong
  • 31
  • 2
  • check this out: https://stackoverflow.com/questions/44298847/why-do-we-need-to-add-fork-in-run-true-when-running-spark-sbt-application – Hall Wong Feb 26 '18 at 07:41