1

I'm writing a Spark structured streaming app that reads from and writes to Kafka. I've been trying to create a fat jar (using sbt assembly) for my spark application, that I can then submit to the spark cluster using the spark-submit script.

However, the application keeps failing with the following error:

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:549)
    at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
    at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:195)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
    ... 6 more
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:533)

Questions

  1. In the documentation, I saw that the spark-submit script needs to be called with the --packages parameter. What does this parameter do?

  2. As far as I could understand, the --packages parameter downloads the specified package and its associated dependencies from the maven repo. If this is correct, is there a way to avoid this submission-time dependency on a remote repo and to a create a far jar that has all the dependencies embedded in it?

Following are the relevant portions of the build.sbt file:

  .....
  "org.apache.kafka" %% "kafka" % "0.10.1.0",
  "org.apache.kafka" % "kafka-clients" % "0.10.1.0",
  "org.apache.spark" %% "spark-core" % SPARK_VERSION % "provided",
  "org.apache.spark" %% "spark-mllib" % SPARK_VERSION,
  "org.apache.spark" %% "spark-sql" % SPARK_VERSION % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % SPARK_VERSION,
  "org.apache.spark" % "spark-streaming_2.10" % SPARK_VERSION % "provided",
  ("org.apache.spark" %% "spark-streaming-kafka-0-10" % SPARK_VERSION).exclude("org.spark-project.spark","unused"),
  ......

EDIT: Here's my entire build.sbt file:

name := "abc"
organization := "com.myorg"
scalaVersion := "2.11.11"
test in assembly := {}
version := "v1"

val SPARK_VERSION = "2.2.0"

libraryDependencies ++= Seq(
  "com.github.scopt" %% "scopt" % "3.5.0",
  "com.metamx" % "java-util" % "0.27.10" % "provided",
  "com.twitter" %% "bijection-scrooge" % "0.9.4",
  "com.twitter" %% "finagle-thrift" % "6.35.0",
  "com.twitter" %% "util-collection" % "6.34.0",
  "com.typesafe.play" %% "play-json" % "2.3.4",
  "io.druid" % "druid" % "0.9.1.1" % "provided",
  "io.druid" % "druid-api" % "0.9.1.1" % "provided",
  "jline" % "jline" % "2.9",
  "org.apache.httpcomponents" % "httpclient" % "4.5.3",
  "org.apache.kafka" %% "kafka" % "0.10.1.0",
  "org.apache.kafka" % "kafka-clients" % "0.10.1.0",
  "org.apache.spark" %% "spark-core" % SPARK_VERSION % "provided",
  "org.apache.spark" %% "spark-mllib" % SPARK_VERSION,
  "org.apache.spark" %% "spark-sql" % SPARK_VERSION % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % SPARK_VERSION,
  "org.apache.spark" %% "spark-streaming" % SPARK_VERSION % "provided",
  ("org.apache.spark" %% "spark-streaming-kafka-0-10" % SPARK_VERSION).exclude("org.spark-project.spark","unused"),
  "org.clapper" %% "grizzled-slf4j" % "1.3.1",
  "org.scalatest" %% "scalatest" % "2.2.4" % "it,test",
  "org.slf4j" % "slf4j-api" % "1.7.5",
  "org.slf4j" % "slf4j-simple" % "1.7.5"
)

// Ref: https://github.com/holdenk/spark-testing-base
libraryDependencies += "com.holdenkarau" %% "spark-testing-base" % "2.0.0_0.4.7" % "test"

resolvers ++= Seq(
  "Job Server Bintray" at "https://dl.bintray.com/spark-jobserver/maven",
  "Typesafe repo" at "http://repo.typesafe.com/typesafe/releases/",
  Resolver.sonatypeRepo("public")
)

val buildSettings = Defaults.defaultSettings ++ Seq(
  javaOptions += "-Xmx2G",
  javaOptions += "-XX:+UseCompressedOops"
)

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", "services", "io.druid.initialization.DruidModule") => MergeStrategy.last
  case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
  case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
  case PathList("com", "google", xs @ _*) => MergeStrategy.last
  case PathList("io", "netty", xs @ _*) => MergeStrategy.last
  case PathList("javax", "inject", xs @ _*) => MergeStrategy.last
  case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
  case PathList("javax", "xml", xs @ _*) => MergeStrategy.last
  case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
  case PathList("org", "apache", xs @ _*) => MergeStrategy.last
  case PathList("org", "apache", "avro", xs @ _*) => MergeStrategy.last
  case PathList("org", "fusesource", xs @ _*) => MergeStrategy.last
  case PathList("org", "slf4j", xs @ _*) => MergeStrategy.last
  case PathList("org", "tachyonproject", xs @ _*) => MergeStrategy.last
  case PathList("scala", xs @ _*) => MergeStrategy.last
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.last
}

lazy val scalatest = "org.scalatest" %% "scalatest" % "2.2.4"
lazy val IntegrationTest = config("it") extend(Test)
def itFilter(name: String): Boolean = name endsWith "ITest"

lazy val root = (project in file("."))
  .configs(IntegrationTest)
  .settings(inConfig(IntegrationTest)(Defaults.itSettings) : _*)
  .settings(
    libraryDependencies += scalatest % "it,test",
    parallelExecution in Test := false,
    javaOptions in Test += s"""-Djava.library.path=${baseDirectory.value / "src/test/resources/lib_pcap"}""",
    fork in Test := true,
    parallelExecution in IntegrationTest := false,
    testOptions in IntegrationTest := Seq(Tests.Filter(itFilter)))

Thanks in advance.

zero323
  • 322,348
  • 103
  • 959
  • 935
jithinpt
  • 1,204
  • 2
  • 16
  • 33
  • Can you show the entire `build.sbt`? Can you check if the uber-jar has `KafkaSource` and `KafkaSourceProvider` inside? – Jacek Laskowski Sep 22 '17 at 06:46
  • HI @JacekLaskowski I've edited my question above to include the entire `build.sbt` file. Also, the uber-jar I'm creating does have both `KafkaSource` and `KafkaSourceProvider` classes inside. I verified this using `jar tf ` and grepping for these class names. – jithinpt Sep 22 '17 at 18:50
  • @JacekLaskowski Adding another comment, since I couldn't highlight your name in the previous one. Thanks. – jithinpt Sep 22 '17 at 23:31

1 Answers1

0

needs to be called with the --packages parameter. What does this parameter do?

Places the jar and its transitive dependencies on CLASSPATH (and downloading them using Ivy if there are not available in the local artifacts repository).

As far as I could understand, the --packages parameter downloads the specified package and its associated dependencies from the maven repo. If this is correct,...

That's correct.

is there a way to avoid this submission-time dependency on a remote repo and to a create a far jar that has all the dependencies embedded in it?

Yes, indeed!

That's where sbt-assembly comes to stage (no pun intended). You should make sure that build.sbt is in proper shape and create a fat-jar.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • HI @JacekLaskowski, thanks for the answer. I've shared my build.sbt file above but the Spark app in the uber-jar (created using `sbt assembly`) fails when I submit it to the Spark cluster. The error is mentioned in the question. – jithinpt Sep 22 '17 at 18:51