1

when I execute my UDF on a local spark cluster it works as expected. When I execute it on docker VMs I get the following error :

Driver stacktrace:] with root cause

java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2934) ~[na:na]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1704) ~[na:na]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) ~[na:na]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[na:na]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) ~[na:na]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[na:na]
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) ~[spark-core_2.12-3.3.0.jar:3.3.0]
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) ~[spark-core_2.12-3.3.0.jar:3.3.0]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:507) ~[spark-core_2.12-3.3.0.jar:3.3.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:na]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:na]
    at java.lang.Thread.run(Thread.java:750) ~[na:na]

The driver code:

        StructType structType = new StructType()
                .add("number", DataTypes.IntegerType)
                .add("factorial", DataTypes.IntegerType);

        List<Row> rows = numbers.stream().map(i -> new GenericRowWithSchema(new Integer[]{i, i}, structType)).collect(Collectors.toList());
        JavaRDD<Row> words = sparkContext.parallelize(rows);

        UserDefinedFunction factorialFunction = udf(
                new Factorial(), DataTypes.LongType
        );

        sparkSession.udf().register("factorialFunction", factorialFunction);

        Dataset<Row> dataFrame = sparkSession.createDataFrame(words, structType);
        //>>next line fails in cluster mode. local mode works fine
        //https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser/28367602#28367602
        List<Row> result = dataFrame.select(col("number"), factorialFunction.apply(col("factorial"))).collectAsList();
        return result.stream().distinct().collect(Collectors.toMap(e -> (Integer) e.get(0), e -> (Long) e.get(1)));

The Spark configuration:

new SparkConf()
                .setAppName(appName)
                .setMaster(masterUri)
                .setJars(new String[]{"target/hello-spark-0.0.1-SNAPSHOT.jar"})
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .set("spark.kryo.registrationRequired", "true")
                .registerKryoClasses(new Class[]{
                        Factorial.class,
                        WordCountService.class,
                        GenericRowWithSchema.class,
                        StructType.class,
                        StructField.class,
                        StructField[].class,
                        IntegerType$.class,
                        Metadata.class,
                        Integer[].class})

The default spring profile executes on the local cluster, the profile 'local' will connect to one master-spark node which delegates to two worker nodes. I also tried the Java serialization which led me to

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[na:na]
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[na:na]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2437) ~[na:na]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[na:na]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) ~[na:na]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[na:na]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) ~[na:na]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[na:na]
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527) ~[scala-library-2.12.15.jar:na]
    at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:na]
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) ~[na:na]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2322) ~[na:na]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) ~[na:na]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[na:na]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) ~[na:na]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[na:na]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[na:na]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[na:na]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) ~[na:na]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[na:na]
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) ~[spark-core_2.12-3.3.0.jar:3.3.0]
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) ~[spark-core_2.12-3.3.0.jar:3.3.0]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) ~[spark-core_2.12-3.3.0.jar:3.3.0]
    at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.0.jar:3.3.0]
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.0.jar:3.3.0]
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) ~[spark-core_2.12-3.3.0.jar:3.3.0]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.0.jar:3.3.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:na]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:na]
    at java.lang.Thread.run(Thread.java:750) ~[na:na]

I also looked up all the corresponding problems on SO. I can see during startup, that the jar file is loaded:

org.apache.spark.SparkContext: Added JAR target/hello-spark-0.0.1-SNAPSHOT.jar at spark://192.168.178.172:54061/jars/hello-spark-0.0.1-SNAPSHOT.jar with timestamp 1656517689330

Please help, I spent 2 days, still no luck.

Kind Regards

Tibor
  • 21
  • 2

0 Answers0