I am trying to execute the following code:-
SparkSession sparkSession = SparkSession
.builder()
.appName("test")
// .master("local")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> df = sparkSession.sql("## My SQL query to HIVE here ##").toDF();
Dataset<Row> rulesDataset = sparkSession.sql("select * from rules.md");
String code = rulesDataset.collectAsList().get(0).get(0).toString();
df.show(false);
Script script=new GroovyShell().parse(code);
UDF3 rulesExecutingUDF = new UDF3<Double,String,String, String>() {
@Override
public String call(Double val1,String val2,String val3) throws Exception {
Binding binding = new Binding();
binding.setVariable("VAL1",val1);
binding.setVariable("VAL2", val2);
binding.setVariable("VAL3", val3);
script.setBinding(binding);
Object value = script.run();
return value.toString();
}
};
sparkSession.udf().register("rulesExecutingUDF",rulesExecutingUDF, DataTypes.StringType);
df=df.withColumn("NEW_COL",callUDF("rulesExecutingUDF",col("VAL1"),col("VAL2"),col("VAL3")));
df.show();
The issue is I am getting a serialization error here, saying task not serializable. I did a lot of trial and error, and found that the statement
Dataset<Row> df = sparkSession.sql("## My SQL query to HIVE here ##").toDF();
might have something to do with this. I am getting this dataset from the hive table from the server.
What I did was I prepared a similar dataset with similar schema and instead of that query, if I use a hardcoded variable like
StructField[] structFields = new StructField[]{
new StructField("VAL1", DataTypes.DoubleType, true, Metadata.empty()),
new StructField("VAL2", DataTypes.StringType, true, Metadata.empty()),
new StructField("VAL3", DataTypes.StringType, true, Metadata.empty())
};
StructType structType = new StructType(structFields);
List<Row> rows = new ArrayList<>();
rows.add(RowFactory.create(160.0,"X","I"));
rows.add(RowFactory.create(200.0,"D","C"));
Dataset<Row> df = sparkSession.createDataFrame(rows, structType);
then I am not getting the serializable error and the spark job successfully executes.
The schema of the datasets created both ways is same and the values also I got from the hive table. I am unable to find why this is happening. Can someone help me out here?
Debugger message
diagnostics: User class threw exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) at org.apache.spark.SparkContext.clean(SparkContext.scala:2304) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258) at org.apache.spark.sql.Dataset.head(Dataset.scala:2489) at org.apache.spark.sql.Dataset.take(Dataset.scala:2703) at org.apache.spark.sql.Dataset.showString(Dataset.scala:254) at org.apache.spark.sql.Dataset.show(Dataset.scala:725) at com.test2.FeefactMD.main(FeefactMD.java:65) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721) Caused by: java.io.NotSerializableException: Script1 Serialization stack: - object not serializable (class: Script1, value: Script1@121b5eab) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.test2.FeefactMD, functionalInterfaceMethod=org/apache/spark/sql/api/java/UDF3.call:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/test2/FeefactMD.lambda$main$c068ede9$1:(Lgroovy/lang/Script;Ljava/lang/Double;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/Double;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class com.test2.FeefactMD$$Lambda$61/1143680308, com.test2.FeefactMD$$Lambda$61/1143680308@1fe9c374) - field (class: org.apache.spark.sql.UDFRegistration$$anonfun$30, name: f$20, type: interface org.apache.spark.sql.api.java.UDF3) - object (class org.apache.spark.sql.UDFRegistration$$anonfun$30, ) - element of array (index: 5) - array (class [Ljava.lang.Object;, size 6) - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;) - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)