0

I have a scala function for encryption, then created a udf out of it and passing it to one of the columns in my als_embeddings data frame to get new column added to my dataframe.

import java.util.Base64
import javax.crypto.Cipher
import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}

val Algorithm = "AES/CBC/PKCS5Padding"
val Key = new SecretKeySpec(Base64.getDecoder.decode("BiwHeIqzQa8X6MXtdg/hhQ=="), "AES")
val IvSpec = new IvParameterSpec(new Array[Byte](16))

def encrypt(text: String): String = {
  val cipher = Cipher.getInstance(Algorithm)
  cipher.init(Cipher.ENCRYPT_MODE, Key, IvSpec)

  new String(Base64.getEncoder.encode(cipher.doFinal(text.getBytes("utf-8"))), "utf-8")
}


val encryptUDF = udf((uid : String) => encrypt(uid))

passing above encryptUDF to my spark dataframe to create a new column with encrypted uid

val als_encrypt_embeddings = als_embeddings.withColumn("encrypt_uid",encryptUDF(col("uid")))
als_encrypt_embeddings.show()

but when I do this it is giving me below error:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

what am i missing here.

toofrellik
  • 1,277
  • 4
  • 15
  • 39
  • Take a look to https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou – Emiliano Martinez Mar 16 '20 at 12:17
  • As you are already declaring encryptUDF as a val, try putting the function implementation there itself, instead of creating a def and then assigning it to value function. If it works then as to why it works, you can refer to the other stackoverflow post shared above. – Amit Mar 16 '20 at 14:08

2 Answers2

2

The error message Task not serializable is correct but not very clear. Further down in the stacktrace there is a more detailed explanation what went wrong:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
[...]
Caused by: java.io.NotSerializableException: javax.crypto.spec.IvParameterSpec
Serialization stack:
    - object not serializable (class: javax.crypto.spec.IvParameterSpec, value: javax.crypto.spec.IvParameterSpec@7d4d65f5)
    - field (class: Starter$$anonfun$1, name: IvSpec$1, type: class javax.crypto.spec.IvParameterSpec)
    - object (class Starter$$anonfun$1, <function1>)
    - element of array (index: 2)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, <function2>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 48 more

In the Caused by part of the stacktrace Spark reports that it was not able to serialize an instance of javax.crypto.spec.IvParameterSpec.

The ParameterSpec has been created within the driver JVM while the udf is being executed at one of the executors. Therefore the object has to be serialized in order to move it to the executor's VM. As the object is not serializable, the attempt to move it fails.

The easiest way to fix the problem is to create the objects needed for the encryption directly within the executor's VM by moving the code block into the udf's closure:

val encryptUDF = udf((uid : String) => {
  val Algorithm = "AES/CBC/PKCS5Padding"
  val Key = new SecretKeySpec(Base64.getDecoder.decode("BiwHeIqzQa8X6MXtdg/hhQ=="), "AES")
  val IvSpec = new IvParameterSpec(new Array[Byte](16))

  def encrypt(text: String): String = {
    val cipher = Cipher.getInstance(Algorithm)
    cipher.init(Cipher.ENCRYPT_MODE, Key, IvSpec)

    new String(Base64.getEncoder.encode(cipher.doFinal(text.getBytes("utf-8"))), "utf-8")
  }
  encrypt(uid)
})

This way all objects will be directly created within the executors VM.

The downside of this approach is that there is one set of encryption object being created per invocation of the udf. This might cause performance problems if the instantiation of these objects is expensive. One option would be to use mapPartitions instead of an udf. In this answer mapPartitions is used to avoid to create too many expensive database connections while iterating over a dataframe. This approach could also be used here.

werner
  • 13,518
  • 6
  • 30
  • 45
0

We can define the function as part of a standalone object that has no references to unserializable values.

object EncryptUtils extends Serializable {
  ...
  ...
  ...
}
David Buck
  • 3,752
  • 35
  • 31
  • 35
Ramesh KB
  • 11
  • 4