0

I want to encrypt a few columns of a Spark dataframe based on some condition. The below encrypt and decrypt function is working fine:

def EncryptDecrypt(Encrypt, str):
    key = b'B5oRyf5Zs3P7atXIf-I5TaCeF3aM1NEILv3A7Zm93b4='
    cipher_suite = Fernet(key)
    if Encrypt is True:
        a = bytes(str, "utf-8")
        return cipher_suite.encrypt(bytes(a))
    else:
        return cipher_suite.decrypt(str)

Now, I want to iterate over specific dataframe column to encrypt it. If the encryption condition is satisfied, I have to iterate over that dataframe column.

if sqldf.filter(condition satistified).count() > 0:
    iterate over that specific column to encrypt its data

I have to maintain dataframe column positions so can't add encrypted column at the end.

Please help me to iterate over dataframe rows and let me know if there is any other more optimize approach.


Below is the approach I am using (Edits)-

I am trying to call udf through spark sql but getting a = bytes(str, "utf-8") TypeError: encoding without a string argument error. Below code I am using to register udf and executing it using spark sql

spark.udf.register("my_udf", EncryptDecrypt, ByteType())
sqldf1 = spark.sql("Select " + my_udf(True, " + column + ") from df1")

column is the filed name.

Ajay
  • 783
  • 3
  • 16
  • 37
  • To call a custum method in Spark you can use an UDF, see e.g: https://stackoverflow.com/questions/52522057/pyspark-udf-column-on-dataframe – Shaido Jan 04 '19 at 06:24
  • Please see my edits – Ajay Jan 04 '19 at 07:07
  • It seems that the type of the column you use is not a string which is required by `bytes`. You can check the type with `printSchema` either cast it inside the method or before calling. – Shaido Jan 04 '19 at 07:22
  • I am passing string to the function `EncryptDecrypt` and within function this string got converted into byte, so not sure what to do next. Can you please confirm, is this the right way to call udf from spark-sql. – Ajay Jan 04 '19 at 08:06
  • I checked the `str` type, it is ``. Now I am able to encrypt it correctly but instead of storing byte into dataframe it is storing `null`. Any leads.... Do I need to change dataframe column type? – Ajay Jan 04 '19 at 08:39
  • Could be that you want to use `BinaryType` which is a byte array. – Shaido Jan 04 '19 at 08:41
  • Getting exception when I used `BinaryType`, type(output) of `EncryptDecrypt` is ``. It seems there is mismatch into the return type during UDF registration and function return type. But not sure how to correct this. – Ajay Jan 04 '19 at 09:15

0 Answers0