when I execute my UDF on a local spark cluster it works as expected. When I execute it on docker VMs I get the following error :
Driver stacktrace:] with root cause
java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2934) ~[na:na]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1704) ~[na:na]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) ~[na:na]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[na:na]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) ~[na:na]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[na:na]
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:507) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:na]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:na]
at java.lang.Thread.run(Thread.java:750) ~[na:na]
The driver code:
StructType structType = new StructType()
.add("number", DataTypes.IntegerType)
.add("factorial", DataTypes.IntegerType);
List<Row> rows = numbers.stream().map(i -> new GenericRowWithSchema(new Integer[]{i, i}, structType)).collect(Collectors.toList());
JavaRDD<Row> words = sparkContext.parallelize(rows);
UserDefinedFunction factorialFunction = udf(
new Factorial(), DataTypes.LongType
);
sparkSession.udf().register("factorialFunction", factorialFunction);
Dataset<Row> dataFrame = sparkSession.createDataFrame(words, structType);
//>>next line fails in cluster mode. local mode works fine
//https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser/28367602#28367602
List<Row> result = dataFrame.select(col("number"), factorialFunction.apply(col("factorial"))).collectAsList();
return result.stream().distinct().collect(Collectors.toMap(e -> (Integer) e.get(0), e -> (Long) e.get(1)));
The Spark configuration:
new SparkConf()
.setAppName(appName)
.setMaster(masterUri)
.setJars(new String[]{"target/hello-spark-0.0.1-SNAPSHOT.jar"})
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(new Class[]{
Factorial.class,
WordCountService.class,
GenericRowWithSchema.class,
StructType.class,
StructField.class,
StructField[].class,
IntegerType$.class,
Metadata.class,
Integer[].class})
The default spring profile executes on the local cluster, the profile 'local' will connect to one master-spark node which delegates to two worker nodes. I also tried the Java serialization which led me to
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[na:na]
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[na:na]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2437) ~[na:na]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[na:na]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) ~[na:na]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[na:na]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) ~[na:na]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[na:na]
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527) ~[scala-library-2.12.15.jar:na]
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:na]
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) ~[na:na]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2322) ~[na:na]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) ~[na:na]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[na:na]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) ~[na:na]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[na:na]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) ~[na:na]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[na:na]
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:na]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:na]
at java.lang.Thread.run(Thread.java:750) ~[na:na]
I also looked up all the corresponding problems on SO. I can see during startup, that the jar file is loaded:
org.apache.spark.SparkContext: Added JAR target/hello-spark-0.0.1-SNAPSHOT.jar at spark://192.168.178.172:54061/jars/hello-spark-0.0.1-SNAPSHOT.jar with timestamp 1656517689330
Please help, I spent 2 days, still no luck.
Kind Regards