I need to add an encrypted version of certain elements to a complex nested structure using Spark Streaming. The JSON elements that come in can have different schemas and therefore I'm looking for a dynamic solution on which I do not need to hardcode Spark schemas.
For instance, this is one the JSONs I could get:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt"
}
]
}
}
What I would like to accomplish is getting the following:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt",
"thisisthevaluetoenrcypt_masked": "BNHFBYHTYBFDBY"
}
]
}
}
Like I mentioned, the schemas could be different, so I may also get something like this:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"onemorestruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt"
}
}
],
"thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"
}
}
And I would like to get something like this:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"onemorestruct":
{
"ID":1,
"thisisthevaluetoencrypt": "imthevaluetoencrypt",
"thisisthevaluetoencrypt_masked": "BNHFBYHTYBFDBY"
}
}
],
"thisisanothervaluetoencrypt": "imtheothervaluetoencrypt",
"thisisanothervaluetoencrypt_masked": "TYHRBVTRHTYJTJ"
}
}
I have a python method to encrypt the value; however, I'm unable to dynamically change the struct. I think something like this could be helpful, but unfortunately I do not have Scala experience and I'm unable to translate it to pyspark, and change it so that it adds a new field instead of changing the current value
Change value of nested column in DataFrame
Any help would be greatly appreciated
EDIT: This is the function i use to encrypt the data. I am doing it through a UDF but can change that if needed
def encrypt_string(s):
result = []
kms = boto3.client('kms', region_name = 'us-west-2')
response = kms.encrypt(
KeyId=key_id,
Plaintext= str(s)
)
return response['CiphertextBlob']