I read large number of deeply nested jsons with fields, that contains special characters, that cause a lot of troubles.
I would like to rename fields' characters /
and -
to underscore _
ideally in PySpark. For example column a-new
to a_new
ยท
NOTE: there are thousands of field names with special characters so it should be done dynamically. If it is easier to deal with the problem to just add fields to back-quotes this would be also solution. The problem I face is that spark interprets only part of struct name (a-new
as a
etc.).
Ref: Rename nested field in spark dataframe
Input df:
root
|-- a-new: long (nullable = true)
|-- b/old: struct (nullable = true)
| |-- c-red: struct (nullable = true)
| | |-- d/bue: struct (nullable = true)
| | | |-- e-green: string (nullable = true)
| | | |-- f-white: struct (nullable = true)
| | | | |-- g/blue: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- date: long (nullable = true)
| | | | | | |-- val: long (nullable = true)
Required outcome:
root
|-- a_new: long (nullable = true)
|-- b_old: struct (nullable = true)
| |-- c_red: struct (nullable = true)
| | |-- d_bue: struct (nullable = true)
| | | |-- e_green: string (nullable = true)
| | | |-- f_white: struct (nullable = true)
| | | | |-- g_blue: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- date: long (nullable = true)
| | | | | | |-- val: long (nullable = true)
I'm wondering if there is more efficient way than to recreate df with new schema as I found in the solution : https://stackoverflow.com/a/58030523/9579821
json_1 = """{"a-new":1,"b/old":{"c-red":{"d/bue":{"e-green":"label_1","f-white":{"g/blue":[{"date":2020,"val":1}]}}}}}"""
df = spark.read.json(sc.parallelize([json_1]))
df.printSchema()
# Some imports
from pyspark.sql.types import DataType, StructType, ArrayType
from copy import copy
# We take a dataframe and return a new one with required changes
def clean_df(df):
# Returns a new sanitized field name (this function can be anything really)
def sanitizeFieldName(s: str) -> str:
return s.replace("-", "_").replace("/", "_")
# We call this on all fields to create a copy and to perform any
# changes we might want to do to the field.
def sanitizeField(field: StructField) -> StructField:
field = copy(field)
field.name = sanitizeFieldName(field.name)
# We recursively call cleanSchema on all types
field.dataType = cleanSchema(field.dataType)
return field
def cleanSchema(dataType: [DataType]) -> [DataType]:
dataType = copy(dataType)
# If the type is a StructType we need to recurse otherwise
# we can return since we've reached the leaf node
if isinstance(dataType, StructType):
# We call our sanitizer for all top level fields
dataType.fields = [sanitizeField(f) for f in dataType.fields]
elif isinstance(dataType, ArrayType):
dataType.elementType = cleanSchema(dataType.elementType)
return dataType
# Now since we have the new schema we can create a new DataFrame
# by using the old Frame's RDD as data and the new schema as the
# schema for the data
return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
clean_df(df).printSchema()