0

I have nested schema with depth level of 6. I am facing issues while traversing each element in the schema to modify a column. I have list which contains column names which needs to be modify(hash/anonymized).

My initial thought is to traverse each element in the schema and compare column with the list items and modify once there is match. But I do not know how to do it.

List values:['type','name','work','email']

Sample schema:

-- abc: struct (nullable = true)
|    |-- xyz: struct (nullable = true)
|    |    |-- abc123: string (nullable = true)
|    |    |-- services: struct (nullable = true)
|    |    |    |-- service: array (nullable = true)
|    |    |    |    |-- element: struct (contains Null = true)
|    |    |    |    |    |-- type: string (nullable = true)
|    |    |    |    |    |-- subtype: string (nullable = true)
|    |    |    |-- name : string(nullable = true)
|    |    |-- details: struct (nullable =true)
|    |    |    |  -- work: (nullable = true)

Note: If I flatten the schema it creates 600+ columns. Thus I am looking for a solution which modify the column dynamically(no hardcoding)

EDIT: if it helps in anyway I am sharing my code where I am trying modify the value, but its broken

def change_nested_field_value(schema, new_df,fields_to_change, parent=""):
    new_schema = []
    
    if isinstance(schema, StringType):
        return schema

    for field in schema:
        full_field_name = field.name
        short_name = full_field_name.split('.')
        
        if parent:
            full_field_name = parent + "." + full_field_name
        
        #print(full_field_name)
        if short_name[-1] not in fields_to_change:            
            if isinstance(field.dataType, StructType):
                inner_schema = change_nested_field_value(field.dataType,new_df, fields_to_change, full_field_name)
                new_schema.append(StructField(field.name, inner_schema))
            elif isinstance(field.dataType, ArrayType):
                inner_schema = change_nested_field_value(field.dataType.elementType, new_df,fields_to_change, full_field_name)
                new_schema.append(StructField(field.name, ArrayType(inner_schema)))
            else:
                new_schema.append(StructField(field.name, field.dataType))
#         else: 
############ this is where I have access to the nested element. I need to modify the value here
#             print(StructField(field.name, field.dataType))
            

    return StructType(new_schema) 
Anup K
  • 43
  • 2
  • 5

1 Answers1

0

You can use the answers from here after converting the struct column into a json using to_json and finally converting back the json string to a struct using from_json. The schema for from_json can be inferred from the the original struct.

from typing import Dict, Any, List
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import json

def encrypt(s: str) -> str:
    return f"encrypted_{s}"

def walk_dict(struct: Dict[str, Any], fields_to_encrypt: List[str]):
    keys_copy = set(struct.keys())
    for k in keys_copy:
        if k in fields_to_encrypt and isinstance(struct[k], str):
            struct[k] = encrypt(struct[k])
        else:
            walk_fields(struct[k], fields_to_encrypt)

def walk_fields(field: Any, fields_to_encrypt: List[str]):
    if isinstance(field, dict):
        walk_dict(field, fields_to_encrypt)
    if isinstance(field, list):
        [walk_fields(e, fields_to_encrypt) for e in field]
        
def encrypt_fields(json_string: str) -> str:
    fields_to_encrypt = ["type", "subtype", "work", ]
    as_json = json.loads(json_string)
    walk_fields(as_json, fields_to_encrypt)
    return json.dumps(as_json)

field_encryption_udf = F.udf(encrypt_fields, StringType())

data = [{
    "abc": {
        "xyz": {
            "abc123": "abc123Value",
            "services": {
                "service": [
                    {
                        "type": "type_element_1",
                        "subtype": "subtype_element_1",
                    },
                    {
                        "type": "type_element_2",
                        "subtype": "subtype_element_2",
                    }
                ],
                "name": "nameVal"
            },
            "details": {
                "work": "workVal"
            }
        }
    }
}, ]

df = spark.read.json(spark.sparkContext.parallelize(data))

schema_for_abc_column = StructType.fromJson([x.jsonValue()["type"] for x in df.schema.fields if x.name == "abc"][0])

df.withColumn("value_changes_column", F.from_json(field_encryption_udf(F.to_json("abc")), schema_for_abc_column)).show(truncate=False)

"""
+-----------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|abc                                                                                                              |value_changes_column                                                                                                                                               |
+-----------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{{abc123Value, {workVal}, {nameVal, [{subtype_element_1, type_element_1}, {subtype_element_2, type_element_2}]}}}|{{abc123Value, {encrypted_workVal}, {nameVal, [{encrypted_subtype_element_1, encrypted_type_element_1}, {encrypted_subtype_element_2, encrypted_type_element_2}]}}}|
+-----------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
"""
Nithish
  • 3,062
  • 2
  • 8
  • 16