2

Say I have a PySpark dataframe df:

>>> df.printSchema()
root
 |-- a: struct
      |-- alpha: integer
      |-- beta: string
      |-- gamma: boolean
 |-- b: string
 |-- c: struct
      |-- delta: string
      |-- epsilon: struct
           |-- omega: string
           |-- psi: boolean

I know I can flatten the dataframe:

select_col_list = [col.replace("a", "a.*").replace("c", "c.*") for col in df.columns]
flat_df = df.select(*select_col_list)

This results in a schema like this:

root
 |-- alpha: integer
 |-- beta: string
 |-- gamma: boolean
 |-- b: string
 |-- delta: string
 |-- epsilon: struct
      |-- omega: string
      |-- psi: boolean

But I want to append the supercolumn's name to subcolumns when I flatten too, so I want the resulting schema to be like this:

root
 |-- a_alpha: integer
 |-- a_beta: string
 |-- a_gamma: boolean
 |-- b: string
 |-- c_delta: string
 |-- c_epsilon: struct
      |-- omega: string
      |-- psi: boolean

How do I do this?

pault
  • 41,343
  • 15
  • 107
  • 149
versatile parsley
  • 411
  • 2
  • 6
  • 15

2 Answers2

1

I don't think there's an straightforward way to do it, but here's a hacky solution that I came up with.

  1. Define a list of the columns to be expanded and create a temporary id column using pyspark.sql.functions.monotonically_increasing_id().
  2. Loop over all the columns in the dataframe and create a temporary dataframe for each one.
    • If the column is in cols_to_expand: Use .* to expand the column. Then rename all fields (except id) in the resultant (temporary) dataframe by with the corresponding prefix using alias().
    • If the column is not in cols_to_expand: Select that column and id and store it in a temporary dataframe.
  3. Store temp_df in a list.
  4. Join all the dataframes in the list using id and drop the id column.

Code:

df = df.withColumn('id', f.monotonically_increasing_id())
cols_to_expand = ['a', 'c']
flat_dfs = []
for col in df.columns:
    if col in cols_to_expand:
        temp_df = df.select('id', col+".*")
        temp_df = temp_df.select(
            [
                f.col(c).alias(col+"_"+c if c != 'id' else c) for c in temp_df.columns
            ]
        )
    else:
        temp_df = df.select('id', col)

    flat_dfs.append(temp_df)

flat_df = reduce(lambda x, y: x.join(y, on='id'), flat_dfs)

flat_df = flat_df.drop('id')
flat_df.printSchema()

The resulting schema:

flat_df.printSchema()
#root
# |-- a_alpha: integer (nullable = true)
# |-- a_beta: string (nullable = true)
# |-- a_gamma: boolean (nullable = true)
# |-- b: string (nullable = true)
# |-- c_delta: string (nullable = true)
# |-- c_epsilon: struct (nullable = true)
# |    |-- omega: string (nullable = true)
# |    |-- psi: boolean (nullable = true)
pault
  • 41,343
  • 15
  • 107
  • 149
0

I actually found a way to do this today. First using the beautiful auto-flattening pyspark function by Evan V. Combine this with the rather brilliant solution to mass-renaming from proggeo and you can basically build up a list of names down the full tree of columns and alias them all as you select.

In my case I took the result of the flatten function and replaced all the "." characters with an "_" in the renaming. Result is as follows:

from pyspark.sql.types import StructType, ArrayType  

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType
        
        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields

# Get actual field names, with nested '.' structure, and create equivalents with '_'
fields=flatten(df.schema)
fields_renamed = [field.replace(".","_") for field in fields]

# Select while aliasing for all fields
df=df.select(*[col(field).alias(new_field) for field,new_field in zip(fields,fields_renamed)])
T. Shaffner
  • 359
  • 1
  • 5
  • 22