From Spark Java App submitted to the Spark Cluster hosted on my machine, I am trying to connect to a Cassandra DB hosted on my machine @ 127.0.0.1:9042 and my Spring Boot application is failing to start.
Approach 1 -
** Based on the Spark-Cassandra-Connector link I had included the below in the POM file -**
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.0-M3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
Approach 1 - NoSuchMethodError - Log File:
16/09/08 15:12:50 ERROR SpringApplication: Application startup failed
java.lang.NoSuchMethodError: com.datastax.driver.core.KeyspaceMetadata.getMaterializedViews()Ljava/util/Collection;
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:281)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:305)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:304)
at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:316)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:304)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:325)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:322)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:122)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:121)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:322)
at com.datastax.spark.connector.cql.Schema$.tableFromCassandra(Schema.scala:342)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:50)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:60)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:60)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:137)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:60)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:232)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:875)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:873)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:873)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:350)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
at com.initech.myapp.cassandra.service.CassandraDataService.getMatches(CassandraDataService.java:45)
at com.initech.myapp.processunit.MySparkApp.receive(MySparkApp.java:120)
at com.initech.myapp.processunit.MySparkApp.process(MySparkApp.java:61)
at com.initech.myapp.processunit.MySparkApp.run(MySparkApp.java:144)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:789)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:779)
at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:769)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1185)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1174)
at com.initech.myapp.MySparkAppBootApp.main(MyAppProcessingUnitsApplication.java:20)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:50)
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
16/09/08 15:12:50 INFO AnnotationConfigApplicationContext: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@3381b4fc: startup date [Thu Sep 08 15:12:40 PDT 2016]; root of context hierarchy
Approach 2 -
** Since what I am developing is a Java Spark app, I thought of using the Spark-Cassandra-Connector-Java and had included the below in the POM file -**
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.0-M3</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
and ended up with this
Approach 2 - SelectableColumnRef NoClassDefFoundError - Log File:
16/09/08 16:28:07 ERROR SpringApplication: Application startup failed java.lang.NoClassDefFoundError: com/datastax/spark/connector/SelectableColumnRef at com.initech.myApp.cassandra.service.CassandraDataService.getMatches(CassandraDataService.java:41)
** My Spark Main method calls the process() method below**
public boolean process() throws InterruptedException {
logger.debug("In the process() method");
SparkConf sparkConf = new SparkConf().setAppName("My Process Unit");
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
sparkConf.set("spark.cassandra.connection.port","9042");
logger.debug("SparkConf = " + sparkConf);
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(1000));
logger.debug("JavaStreamingContext = " + javaStreamingContext);
JavaSparkContext javaSparkContext = javaStreamingContext.sparkContext();
logger.debug("Java Spark context = " + javaSparkContext);
JavaRDD<MyData> myDataJavaRDD = receive(javaSparkContext);
myDataJavaRDD.foreach(myData -> {
logger.debug("myData = " + myData);
});
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
return true; }
** which calls the receive() below **
private JavaRDD<MyData> receive(JavaSparkContext javaSparkContext) {
logger.debug("receive method called...");
List<String> myAppConfigsStrings = myAppConfiguration.get();
logger.debug("Received ..." + myAppConfigsStrings);
for(String myAppConfigStr : myAppConfigsStrings)
{
ObjectMapper mapper = new ObjectMapper();
MyAppConfig myAppConfig;
try {
logger.debug("Parsing the myAppConfigStr..." + myAppConfigStr);
myAppConfig = mapper.readValue(myAppConfigStr, MyAppConfig.class);
logger.debug("Parse Complete...");
// Check for matching data in Cassandra
JavaRDD<MyData> cassandraRowsRDD = cassandraDataService.getMatches(myAppConfig, javaSparkContext);
cassandraRowsRDD.foreach(myData -> {
logger.debug("myData = " + myData);
});
return cassandraRowsRDD;
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
** Eventually calling the Cassandra Data Service getMatches() below **
@Service
public class CassandraDataService implements Serializable {
private static final Log logger = LogFactory.getLog(CassandraDataService.class);
public JavaRDD<MyData> getMatches(MyAppConfig myAppConfig, JavaSparkContext javaSparkContext) {
logger.debug("Creating the MyDataID...");
MyDataID myDataID = new MyDataID();
myDataID.set...(myAppConfig.get...);
myDataID.set...(myAppConfig.get...);
myDataID.set...(myAppConfig.get...);
logger.debug("MyDataID = " + myDataID);
JavaRDD<MyData> cassandraRowsRDD = javaFunctions(javaSparkContext).cassandraTable("myKeySpace", "myData", mapRowTo(MyData.class));
cassandraRowsRDD.foreach(myData -> {
logger.debug("====== Cassandra Data Service ========");
logger.debug("myData = " + myData);
logger.debug("====== Cassandra Data Service ========");
});
return cassandraRowsRDD;
}
}
Has anyone experienced similar error or could provide me in some direction? I have tried googling and reading through several items - but none to rescue. Thanks.
Update 9/9/2016 2:15 PM PST
I tried the approach above. Here is what I have done -
- Spark cluster running with 1 worker thread
Submitted my Spark App using the Spring Boot Uber Jar using spark-submit command below -
./bin/spark-submit --class org.springframework.boot.loader.JarLauncher --master spark://localhost:6066 --deploy-mode cluster /Users/apple/Repos/Initech/Officespace/target/my-spring-spark-boot-streaming-app-0.1-SNAPSHOT.jar
The Spark Driver program started successfully and initiated my Spark App and was set to "WAITING" state as there was only one worker running that was allocated to the driver program
- I then started another worker thread and then the App worker thread had failed because of "java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition". Below is the stack trace.
If it is useful in anyway - he is the stack I am using
1. cqlsh 5.0.1 | Cassandra 2.2.7 | CQL spec 3.3.1
2. Spark - 2.0.0
3. Spring Boot - 1.4.0.RELEASE
4. Jar's listed in the Approach 1 above
Exception Stack Tracke
16/09/09 14:13:24 ERROR SpringApplication: Application startup failed
java.lang.IllegalStateException: Failed to execute ApplicationRunner
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:792)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:779)
at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:769)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1185)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1174)
at com.initech.officespace.MySpringBootSparkApp.main(MySpringBootSparkApp.java:23)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:50)
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 192.168.0.30): java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:875)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:873)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:873)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:350)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
at com.initech.officespace.cassandra.service.CassandraDataService.getMatches(CassandraDataService.java:43)
at com.initech.officespace.processunit.MyApp.receive(MyApp.java:120)
at com.initech.officespace.processunit.MyApp.process(MyApp.java:61)
at com.initech.officespace.processunit.MyApp.run(MyApp.java:144)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:789)
... 20 more
Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/09/09 14:13:24 INFO AnnotationConfigApplicationContext: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@3381b4fc: startup date [Fri Sep 09 14:10:40 PDT 2016]; root of context hierarchy
Update 2 on 9/9/2016 3:20 PM PST
Issue is now resolved based on the answer provided by RussS @ Issues with datastax spark-cassandra connector
After updating my spark-submit to the below, I am seeing that the worker is able to pickup the connecter and start working on the RDDs :)
./bin/spark-submit --class org.springframework.boot.loader.JarLauncher --master spark://localhost:6066 --deploy-mode cluster --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 /Users/apple/Repos/Initech/Officespace/target/my-spring-spark-boot-streaming-app-0.1-SNAPSHOT.jar