The error message Task not serializable
is correct but not very clear. Further down in the stacktrace there is a more detailed explanation what went wrong:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
[...]
Caused by: java.io.NotSerializableException: javax.crypto.spec.IvParameterSpec
Serialization stack:
- object not serializable (class: javax.crypto.spec.IvParameterSpec, value: javax.crypto.spec.IvParameterSpec@7d4d65f5)
- field (class: Starter$$anonfun$1, name: IvSpec$1, type: class javax.crypto.spec.IvParameterSpec)
- object (class Starter$$anonfun$1, <function1>)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 3)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 48 more
In the Caused by
part of the stacktrace Spark reports that it was not able to serialize an instance of javax.crypto.spec.IvParameterSpec
.
The ParameterSpec has been created within the driver JVM while the udf is being executed at one of the executors. Therefore the object has to be serialized in order to move it to the executor's VM. As the object is not serializable, the attempt to move it fails.
The easiest way to fix the problem is to create the objects needed for the encryption directly within the executor's VM by moving the code block into the udf's closure:
val encryptUDF = udf((uid : String) => {
val Algorithm = "AES/CBC/PKCS5Padding"
val Key = new SecretKeySpec(Base64.getDecoder.decode("BiwHeIqzQa8X6MXtdg/hhQ=="), "AES")
val IvSpec = new IvParameterSpec(new Array[Byte](16))
def encrypt(text: String): String = {
val cipher = Cipher.getInstance(Algorithm)
cipher.init(Cipher.ENCRYPT_MODE, Key, IvSpec)
new String(Base64.getEncoder.encode(cipher.doFinal(text.getBytes("utf-8"))), "utf-8")
}
encrypt(uid)
})
This way all objects will be directly created within the executors VM.
The downside of this approach is that there is one set of encryption object being created per invocation of the udf. This might cause performance problems if the instantiation of these objects is expensive. One option would be to use mapPartitions instead of an udf. In this answer mapPartitions is used to avoid to create too many expensive database connections while iterating over a dataframe. This approach could also be used here.