0

I got this error while using this code to drop a nested column with pyspark. Why is this not working? I was trying to use a tilde instead of a not != as the error suggests but it doesnt work either. So what do you do in that case?

def drop_col(df, struct_nm, delete_struct_child_col_nm):
    fields_to_keep = filter(lambda x:  x != delete_struct_child_col_nm, df.select(" 
{}.*".format(struct_nm)).columns)
    fields_to_keep = list(map(lambda x:  "{}.{}".format(struct_nm, x), fields_to_keep))
    return df.withColumn(struct_nm, struct(fields_to_keep))
Barushkish
  • 69
  • 2
  • 9

1 Answers1

2

I built a simple example with a struct column and a few dummy columns:

from pyspark import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, lit, col, struct
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.getOrCreate()
sql_context = SQLContext(spark.sparkContext)
schema = StructType(
    [
        StructField('addresses',
                    StructType(
                        [StructField("state", StringType(), True),
                         StructField("street", StringType(), True),
                        StructField("country", StringType(), True),
                         StructField("code", IntegerType(), True)]
                    )
                    )
    ]
)

rdd = [({'state': 'pa', 'street': 'market', 'country': 'USA', 'code': 100},),
       ({'state': 'ca', 'street': 'baker',  'country': 'USA', 'code': 101},)]

df = sql_context.createDataFrame(rdd, schema)
df = df.withColumn('id', monotonically_increasing_id())
df = df.withColumn('name', lit('test'))

print(df.show())
print(df.printSchema())

Output:

+--------------------+-----------+----+
|           addresses|         id|name|
+--------------------+-----------+----+
|[pa, market, USA,...| 8589934592|test|
|[ca, baker, USA, ...|25769803776|test|
+--------------------+-----------+----+

root
 |-- addresses: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- code: integer (nullable = true)
 |-- id: long (nullable = false)
 |-- name: string (nullable = false)

To drop the whole struct column, you can simply use the drop function:

df2 = df.drop('addresses')
print(df2.show())

Output:

+-----------+----+
|         id|name|
+-----------+----+
| 8589934592|test|
|25769803776|test|
+-----------+----+

To drop specific fields, in a struct column, it's a bit more complicated - there are some other similar questions here:

In any case, I found them to be a bit complicated - my approach would just be to reassign the original column with the subset of struct fields you want to keep:

columns_to_keep = ['country', 'code']

df = df.withColumn('addresses', struct(*[f"addresses.{column}" for column in columns_to_keep]))

Output:

+----------+-----------+----+
| addresses|         id|name|
+----------+-----------+----+
|[USA, 100]| 8589934592|test|
|[USA, 101]|25769803776|test|
+----------+-----------+----+

Alternatively, if you just wanted to specify the columns you want to remove rather than the columns you want to keep:

columns_to_remove = ['country', 'code']
all_columns = df.select("addresses.*").columns
columns_to_keep = list(set(all_columns) - set(columns_to_remove))
df = df.withColumn('addresses', struct(*[f"addresses.{column}" for column in columns_to_keep]))

Output:

+------------+-----------+----+
|   addresses|         id|name|
+------------+-----------+----+
|[pa, market]| 8589934592|test|
| [ca, baker]|25769803776|test|
+------------+-----------+----+

Hope this helps!

danielcahall
  • 2,672
  • 8
  • 14
  • Thanks for the quick answer. Actually this doesn't entirely work for me. I get an error of "invalid syntax" regarding this line "df = df.withColumn('addresses', struct(*[f"addresses.{column}" for column in columns_to_keep]))". – Barushkish Apr 11 '20 at 12:16
  • Hm what kind of syntax error? If you're using a version of Python below 3.6, you may not have f-strings (https://www.python.org/dev/peps/pep-0498/) so you would have to use another method of formatting the strings i.e; `["addresses." + column for column in columns_to_keep]` – danielcahall Apr 11 '20 at 12:33
  • Also as a side note, I realized the conversion to a list in this line `columns_to_keep = list(set(all_columns) - set(columns_to_remove))` is unnecessary - you can keep it as a set since we're just iterating over it in the next line and there won't be duplicates: `columns_to_keep = set(all_columns) - set(columns_to_remove)` – danielcahall Apr 11 '20 at 12:35
  • Thanks. Indeed i got it fixed already by changing this line "df = df.withColumn('addresses', struct(*[f"addresses.{column}" for column in columns_to_keep]))" to a line which is in the code quoted in my question "fields_to_keep = list(map(lambda x: "{}.{}".format(struct_nm, x), fields_to_keep))" this did work. Though ur current solution looks even easier. Not sure about my version of python couldbe this causes the errors – Barushkish Apr 11 '20 at 14:46
  • 1
    Ah yeah that will fix it too - it’s just a matter of changing the way you build the formatted string, which could be through ‘+’ or ‘format’ function. In Python 3.6 onwards, strings can also be formatted using the method I used, which are called f-strings. Glad it’s all working though! – danielcahall Apr 11 '20 at 16:56