3

While submitting the application with spark-submit it works.

But while trying to submit it Programmatically using the command below

mvn exec:java -Dexec.mainClass="org.cybergen.SubmitJobExample" -Dexec.args="/opt/spark/current/README.md Please"

Am getting the following error while trying to do so

Application Log

15/05/12 17:19:46 INFO AppClient$ClientActor: Connecting to master spark://cyborg:7077...
15/05/12 17:19:46 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkMaster@cyborg:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
15/05/12 17:20:06 INFO AppClient$ClientActor: Connecting to master spark://cyborg:7077...
15/05/12 17:20:06 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkMaster@cyborg:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

Spark Master Log

15/05/12 17:33:22 ERROR EndpointWriter: AssociationError [akka.tcp://sparkMaster@cyborg:7077] <- [akka.tcp://sparkDriver@10.18.26.116:49592]: Error [org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 7674242335164700840, local class serialVersionUID = 2596819202403185464] [
java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 7674242335164700840, local class serialVersionUID = 2596819202403185464
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
    at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
    at scala.util.Try$.apply(Try.scala:161)
    at akka.serialization.Serialization.deserialize(Serialization.scala:98)
    at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:63)
    at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
    at scala.util.Try$.apply(Try.scala:161)
    at akka.serialization.Serialization.deserialize(Serialization.scala:98)
    at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
    at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
    at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
    at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
    at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
]
15/05/12 17:33:22 INFO Master: akka.tcp://sparkDriver@10.18.26.116:49592 got disassociated, removing it.
15/05/12 17:33:22 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@10.18.26.116:49592] has failed, address is now gated for [5000] ms. Reason is: [org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 7674242335164700840, local class serialVersionUID = 2596819202403185464].
15/05/12 17:33:22 INFO LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsparkDriver%4010.18.26.116%3A49592-6/endpointWriter/endpointReader-akka.tcp%3A%2F%2FsparkDriver%4010.18.26.116%3A49592-0#1749840468] was not delivered. [10] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
15/05/12 17:33:22 INFO LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%40127.0.0.1%3A50366-7#-1224275483] was not delivered. [11] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
15/05/12 17:33:42 ERROR EndpointWriter: AssociationError [akka.tcp://sparkMaster@cyborg:7077] <- [akka.tcp://sparkDriver@10.18.26.116:49592]: Error [org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 7674242335164700840, local class serialVersionUID = 2596819202403185464] [
java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 7674242335164700840, local class serialVersionUID = 2596819202403185464
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

SparkTestJob : is the spark job class

class SparkTestJob(val filePath:String="",val filter:String ="") extends Serializable{
  def runWordCount() :Long = {
  val conf = new SparkConf()
    .setAppName("word count for the word"+filter)
    .setMaster("spark://cyborg:7077")
    .setJars(Seq("/tmp/spark-example-1.0-SNAPSHOT-driver.jar"))
    .setSparkHome("/opt/spark/current")
  val sc = new SparkContext(conf)
  val file = sc.textFile(filePath)
  file.filter(line => line.contains(filter)).count()
 }
}

SubmitJobExample is the object which initiates the SparkTestJob Class

object SubmitJobExample {

  def main(args: Array[String]):Unit={
    if(args.length==2){
      val fileName = args(0)
      val filterByWord = args(1)
      println("Reading file "+fileName+" for word "+filterByWord)
      val jobObject = new SparkTestJob(fileName,filterByWord)
      println("word count for the file "+fileName+" is "+jobObject.runWordCount())
    }else{
      val jobObject = new SparkTestJob("/opt/spark/current/README.md","Please")
      println("word count for the file /opt/spark/current/README.md is "+jobObject.runWordCount())
    }
  }
}
Cœur
  • 37,241
  • 25
  • 195
  • 267
Vishnu667
  • 768
  • 1
  • 16
  • 39

2 Answers2

1

Your code looks fine to me. To debug serialization problems run with -Dsun.io.serialization.extendedDebugInfo=true. This will print extra output upon NotSerializableException and you will see what it's trying to serialize.

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • You mean the method above feasible? – yjshen May 12 '15 at 10:15
  • I don't see anything wrong with the code in the question. There's probably something wrong with it, or it would work :). But instead of looking at it more closely, Vishnu667 should just run it with the debug flag. This is useful for debugging similar problems in the future as well. – Daniel Darabos May 12 '15 at 10:31
  • @DanielDarabos after adding Serializable to the SparkTestJob class i got rid of the serializable error. I've updated the question with the current error it is throwing. – Vishnu667 May 12 '15 at 11:58
  • Looks like it cannot connect to the master now. Perhaps the master is not running. It's probably a good idea to use `spark-submit` unless you know what you're doing. – Daniel Darabos May 12 '15 at 12:04
  • @DanielDarabos Yes i know what I'm doing and that's the reason for the question. How to submit a spark Job Programatically. I've added the spark-master's log to the question too. the app is getting connected to the spark master. – Vishnu667 May 12 '15 at 12:16
  • I think it's telling you that the the code that serialized the data is not the same as the code that is deserializing it. Did you rebuild `/tmp/spark-example-1.0-SNAPSHOT-driver.jar` after the last change? – Daniel Darabos May 12 '15 at 12:45
  • @DanielDarabos Yes i did rebuild it again and tried executing same error – Vishnu667 May 12 '15 at 16:53
  • @DanielDarabos The issue was version mismatch in the dependencies – Vishnu667 May 12 '15 at 19:11
1

The Actual problem was a mismatch of spark versions by one of the dependencies Changing all the dependencies to the same spark version Fixed the problem.

Reason why it worked while performing spark-submit is because of the java JAR-class-path precedence which used the correct spark jar version.

Vishnu667
  • 768
  • 1
  • 16
  • 39