11

I'm trying to drop some nested columns from structs in a Spark dataframe using PySpark.

I found this for Scala that seems to be doing exactly what I want to, but I'm not familiar with Scala and don't know how to write it in Python.
https://stackoverflow.com/a/39943812/5706548

ZygD
  • 22,092
  • 39
  • 79
  • 102
Pierre
  • 938
  • 1
  • 15
  • 37

6 Answers6

6

We can now do it natively with Spark version >= 3.1

https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.dropFields.html

blackbishop
  • 30,945
  • 11
  • 55
  • 76
Pierre
  • 938
  • 1
  • 15
  • 37
  • Not working: File "C:\spark-3.1.2-bin-hadoop2.7\python\pyspark\sql\dataframe.py", line 1643, in __getattr__ raise AttributeError( AttributeError: 'DataFrame' object has no attribute 'dropFields' – deathrace Sep 27 '21 at 12:50
  • 1
    it’s Column.dropFields, not DataFrame.dropFields @deathrace – Pierre Sep 28 '21 at 13:13
4

Example for pyspark:

def drop_col(df, struct_nm, delete_struct_child_col_nm):
    fields_to_keep = filter(lambda x:  x != delete_struct_child_col_nm, df.select("{}.*".format(struct_nm)).columns)
    fields_to_keep = list(map(lambda x:  "{}.{}".format(struct_nm, x), fields_to_keep))
    return df.withColumn(struct_nm, struct(fields_to_keep))
Jorrick Sleijster
  • 935
  • 1
  • 9
  • 22
Spark-Beginner
  • 1,334
  • 5
  • 17
  • 24
3

A method that I found using pyspark is by first converting the nested column into json and then parse the converted json with a new nested schema with the unwanted columns filtered out.

Suppose I have the following schema and I want to drop d, e and j (a.b.d, a.e, a.h.j) from the dataframe:

root
 |-- a: struct (nullable = true)
 |    |-- b: struct (nullable = true)
 |    |    |-- c: long (nullable = true)
 |    |    |-- d: string (nullable = true)
 |    |-- e: struct (nullable = true)
 |    |    |-- f: long (nullable = true)
 |    |    |-- g: string (nullable = true)
 |    |-- h: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- i: string (nullable = true)
 |    |    |    |-- j: string (nullable = true)
 |-- k: string (nullable = true)

I used the following approach:

  1. Create new schema for a by excluding d, e and j. A quick way to do this is by manually select the fields that you want from df.select("a").schema and create a new schema from the selected fields using StructType. Or, you can do this programmatically by traversing the schema tree and exclude the unwanted fields, something like:

    def exclude_nested_field(schema, unwanted_fields, parent=""):
        new_schema = []
    
        for field in schema:
            full_field_name = field.name
            if parent:
                full_field_name = parent + "." + full_field_name
    
            if full_field_name not in unwanted_fields:
                if isinstance(field.dataType, StructType):
                    inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name)
                    new_schema.append(StructField(field.name, inner_schema))
                elif isinstance(field.dataType, ArrayType):
                    new_schema.append(StructField(field.name, ArrayType(field.dataType.elementType)))
                else:
                    new_schema.append(StructField(field.name, field.dataType))
    
        return StructType(new_schema)
    
    new_schema = exclude_nested_field(df.schema["a"].dataType, ["b.d", "e", "h.j"])
    
  2. Convert a column to json: .withColumn("json", F.to_json("a")).drop("a")

  3. Parse the json-converted a column from step 2 with the new schema found in step 1: .withColumn("a", F.from_json("json", new_schema)).drop("json")

Community
  • 1
  • 1
dekauliya
  • 1,303
  • 2
  • 15
  • 26
  • I'm trying to use this function but my structure has an array> and I'm getting "TypeError: 'ArrayType' object is not iterable" . Any ideas how to fix this? – Alex Fragotsis Aug 04 '20 at 16:19
  • We now have something in Spark 3.1.1 to handle better the nested fields and be able to edit or drop them without touching the other ones https://germanschiavon.medium.com/spark-3-nested-fields-not-so-nested-anymore-9b8d34b00b95 – Pierre Mar 01 '21 at 09:31
2

Having the below dataframe, the aim is to drop d, e and j.

from pyspark.sql import functions as F
df = spark.createDataFrame([], "a struct<b:struct<c:bigint,d:string>,e:struct<f:bigint,g:string>,h:array<struct<i:string,j:string>>>, k string")
df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |    |-- d: string (nullable = true)      # <<--- to be dropped
#  |    |-- e: struct (nullable = true)           # <<--- to be dropped
#  |    |    |-- f: long (nullable = true)
#  |    |    |-- g: string (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |    |    |    |-- j: string (nullable = true)  # <<--- to be dropped
#  |-- k: string (nullable = true)

e is the easiest:

df = df.withColumn("a", F.col("a").dropFields("e"))

df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |    |-- d: string (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |    |    |    |-- j: string (nullable = true)
#  |-- k: string (nullable = true)

In order to drop d, we must go inside b:

df = df.withColumn("a", F.col("a").withField("b", F.col("a.b").dropFields("d")))

df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |    |    |    |-- j: string (nullable = true)
#  |-- k: string (nullable = true)

j is inside array, so transform must also be used. It "loops" through every array's elements (in this case, the element is a struct) and transforms it (removes a field).

df = df.withColumn("a", F.col("a").withField(
    "h",
    F.transform(
        F.col("a.h"),
        lambda x: x.dropFields("j")
    )
))

df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |-- k: string (nullable = true)
ZygD
  • 22,092
  • 39
  • 79
  • 102
1

Althoug I've no solution for PySpark, maybe it's easier to translate this into python. Consider a dataframe df with schema:

root
 |-- employee: struct (nullable = false)
 |    |-- name: string (nullable = false)
 |    |-- age: integer (nullable = false)

Then if you want e.g. to drop name, you can do:

val fieldsToKeep = df.select($"employee.*").columns
.filter(_!="name") // the nested column you want to drop
.map(n => "employee."+n)

// overwite column with subset of fields
df
.withColumn("employee",struct(fieldsToKeep.head,fieldsToKeep.tail:_*)) 
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
0

Pyspark version of Raphaels Scala answer.

This runs at a certain depth, discards everything above that depth and filters on the row below it.

def remove_columns(df,root):
  from pyspark.sql.functions import col
  cols = df.select(root).columns
  fields_filter = filter(lambda x: x[0]!= "$", cols) # use your own lambda here. 
  fieldsToKeep = list(map(lambda x: root[:-1] + x, fields_filter)) 
  return df.select(fieldsToKeep)

df = remove_columns(raw_df, root="level1.level2.*")
Community
  • 1
  • 1
jabberwocky
  • 995
  • 10
  • 18