2

I read large number of deeply nested jsons with fields, that contains special characters, that cause a lot of troubles.

I would like to rename fields' characters / and - to underscore _ ideally in PySpark. For example column a-new to a_newยท

NOTE: there are thousands of field names with special characters so it should be done dynamically. If it is easier to deal with the problem to just add fields to back-quotes this would be also solution. The problem I face is that spark interprets only part of struct name (a-new as a etc.).

Ref: Rename nested field in spark dataframe

Input df:

root
 |-- a-new: long (nullable = true)
 |-- b/old: struct (nullable = true)
 |    |-- c-red: struct (nullable = true)
 |    |    |-- d/bue: struct (nullable = true)
 |    |    |    |-- e-green: string (nullable = true)
 |    |    |    |-- f-white: struct (nullable = true)
 |    |    |    |    |-- g/blue: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- date: long (nullable = true)
 |    |    |    |    |    |    |-- val: long (nullable = true)

Required outcome:

root
 |-- a_new: long (nullable = true)
 |-- b_old: struct (nullable = true)
 |    |-- c_red: struct (nullable = true)
 |    |    |-- d_bue: struct (nullable = true)
 |    |    |    |-- e_green: string (nullable = true)
 |    |    |    |-- f_white: struct (nullable = true)
 |    |    |    |    |-- g_blue: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- date: long (nullable = true)
 |    |    |    |    |    |    |-- val: long (nullable = true)

I'm wondering if there is more efficient way than to recreate df with new schema as I found in the solution : https://stackoverflow.com/a/58030523/9579821

json_1 = """{"a-new":1,"b/old":{"c-red":{"d/bue":{"e-green":"label_1","f-white":{"g/blue":[{"date":2020,"val":1}]}}}}}"""
df = spark.read.json(sc.parallelize([json_1]))
df.printSchema()

    # Some imports
from pyspark.sql.types import DataType, StructType, ArrayType
from copy import copy

# We take a dataframe and return a new one with required changes
def clean_df(df):
    # Returns a new sanitized field name (this function can be anything really)
    def sanitizeFieldName(s: str) -> str:
        return s.replace("-", "_").replace("/", "_")
    
    # We call this on all fields to create a copy and to perform any 
    # changes we might want to do to the field.
    def sanitizeField(field: StructField) -> StructField:
        field = copy(field)
        field.name = sanitizeFieldName(field.name)
        # We recursively call cleanSchema on all types
        field.dataType = cleanSchema(field.dataType)
        return field
    
    def cleanSchema(dataType: [DataType]) -> [DataType]:
        dataType = copy(dataType)
        # If the type is a StructType we need to recurse otherwise 
        # we can return since we've reached the leaf node
        if isinstance(dataType, StructType):
            # We call our sanitizer for all top level fields
            dataType.fields = [sanitizeField(f) for f in dataType.fields]
        elif isinstance(dataType, ArrayType):
            dataType.elementType = cleanSchema(dataType.elementType)
        return dataType

    # Now since we have the new schema we can create a new DataFrame 
    # by using the old Frame's RDD as data and the new schema as the 
    # schema for the data
   
  return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
   
clean_df(df).printSchema()
Dan
  • 437
  • 7
  • 24

1 Answers1

0
   # Rename columns using `withColumnRenamed`
   for c in df.columns:

        df = df.withColumnRenamed(c,c.replace('-','_').replace('/','_'))
     
   # Rename nested fields using `cast`
   for c in df.columns:
        new_schema = df.select(c).schema.simpleString().replace('-','_').replace('/','_')[8+len(c):-1]
        df = df.withColumn(c,F.col(c).cast(new_schema))
Dan
  • 437
  • 7
  • 24