I have a Java microservice that connects to an Apache Spark cluster and uses Datastax Spark-Cassandra connector to persist data to an apache Cassandra DB cluster.
I have written the following method to delete data from a Cassandra table for a specific date range.
The exact code is shown below:
public void deleteData(String fromDate, String toDate) {
SparkConf conf = sparkSession.sparkContext().getConf();
CassandraConnector connector = CassandraConnector.apply(conf);
Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
put("keyspace", CassandraProperties.KEYSPACE);
put("table", CassandraProperties.ENERGY_FORECASTS);
}}).load()
.filter(col("timestamp")
.substr(1, 10)
.between(fromDate, toDate))
.select("nodeid");
df.foreachPartition(partition -> {
Session session = connector.openSession();
while (partition.hasNext()) {
Row row = partition.next();
session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
}
session.close();
});
}
}
@Bean
public SparkSession sparkSession() {
return SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", host)
.config("spark.cassandra.connection.port", port)
.config("spark.sql.caseSensitive", false)
.master(master)
.getOrCreate();
The code executes fine when run using a local spark master node ( .master("local[*]")
option).
However, when I try to execute the same code while connecting to a remote spark master node, the following error occurs:
Driver stacktrace:] with root cause java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.Dataset$$anonfun$foreachPartition$2.func$4 of type org.apache.spark.api.java.function.ForeachPartitionFunction in instance of org.apache.spark.sql.Dataset$$anonfun$foreachPartition$2 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [pool-18-thread-1] INFO com.datastax.spark.connector.cql.CassandraConnector - Disconnected from Cassandra cluster: Test Cluster
UPDATE 1
It appears that what did the trick for me was adding the following line at the sparkSession configuration:
.config("spark.jars", "meter-service-1.0.jar")
This appears to provide the missing dependencies that prevented Spark from deserializing the lamda expression correctly at the remote nodes.
This is better explained here