-3

I would like to add a nested object ("struct") to a pySpark dataframe and write this out to parquet. I want to recreate the following (currently prepared using Scala spark + udf (How to add a new Struct column to a DataFrame)):

 |-- _level1: struct (nullable = true)
 |    |-- level2a: struct (nullable = true)
 |    |    |-- fielda: string (nullable = true)
 |    |    |-- fieldb: string (nullable = true)
 |    |    |-- fieldc: string (nullable = true)
 |    |    |-- fieldd: string (nullable = true)
 |    |    |-- fielde: string (nullable = true)
 |    |    |-- fieldf: string (nullable = true)
 |    |-- level2b: struct (nullable = true)
 |    |    |-- fielda: string (nullable = true)
 |    |    |-- fieldb: string (nullable = true)
 |    |    |-- fieldc: string (nullable = true)

What is the best way to do this?

Oliver W.
  • 13,169
  • 3
  • 37
  • 50
pwwolff
  • 606
  • 1
  • 5
  • 20
  • Yes of course, I tried various methods. I've since figured out something that works and posted below. – pwwolff Nov 30 '19 at 19:35

2 Answers2

1

I have figured out to do what I wanted. The idea is to create the schema for the nested column (struct) as follows:

from pyspark.sql.functions import lit, udf
from pyspark.sql.types import StringType, StructField, StructType

schema = StructType([
            StructField('level2a',
                        StructType(
                            [
                                StructField('fielda', StringType(), nullable=False),
                                StructField('fieldb', StringType(), nullable=False),
                                StructField('fieldc', StringType(), nullable=False),
                                StructField('fieldd', StringType(), nullable=False),
                                StructField('fielde', StringType(), nullable=False),
                                StructField('fieldf', StringType(), nullable=False)
                            ])
                        ),
            StructField('level2b',
                        StructType(
                            [
                                StructField('fielda', StringType(), nullable=False),
                                StructField('fieldb', StringType(), nullable=False),
                                StructField('fieldc', StringType(), nullable=False)
                            ])
                        )
        ])

This can then be used in conjunction with a udf (which takes the above schema as a parameter) to get the desired result.


def make_meta(fielda, fieldb, fieldc, fieldd, fielde, fieldf, fieldalvl2, fieldblvl2, fieldclvl2):
    return [
        [fielda, fieldb, fieldc, fieldd, fielde, fieldf],
        [fieldalvl2, fieldblvl2, fieldclvl2]
    ]

test_udf = udf(lambda fielda,
               fieldb,
               fieldc,
               fieldd,
               fieldf,
               fielde,
               fieldalvl2, fieldblvl2, fieldclvl2:
               make_meta(fielda,
               fieldb,
               fieldc,
               fieldd,
               fieldf,
               fielde, fieldalvl2, fieldblvl2, fieldclvl2),
               schema)

df = spark.range(0, 5)
df.withColumn("test", test_udf(lit("a"), lit("b"), lit("c"),lit("d"),lit("e"),lit("f"),lit("a"),lit("b"),lit("c"))).printSchema()

Prints the following:

root
 |-- id: long (nullable = false)
 |-- test: struct (nullable = true)
 |    |-- level2a: struct (nullable = true)
 |    |    |-- fielda: string (nullable = false)
 |    |    |-- fieldb: string (nullable = false)
 |    |    |-- fieldc: string (nullable = false)
 |    |    |-- fieldd: string (nullable = false)
 |    |    |-- fielde: string (nullable = false)
 |    |    |-- fieldf: string (nullable = false)
 |    |-- level2b: struct (nullable = true)
 |    |    |-- fielda: string (nullable = false)
 |    |    |-- fieldb: string (nullable = false)
 |    |    |-- fieldc: string (nullable = false)

In scala it is possible to return an instance of a case class from a udf, which was what I was trying to do in python (ie. return an object)

pwwolff
  • 606
  • 1
  • 5
  • 20
1

If you want to nest columns, you could use the struct function. This will be more efficient than using a user defined function (udf), as the operations take place directly on the Java Virtual Machine.

Here’s an example:

In [1]: from pyspark.sql.functions import struct, col
   ...: 
   ...: df = spark.createDataFrame([(list("abcdefABC"))],
   ...:                            schema=list("abcdefghi")
   ...:                            )
   ...: df2 = df.select(
   ...:     struct(
   ...:         struct(*(col(_).alias("field%s" % _) for _ in "abcdef")).alias("level2a"),
   ...:         struct(*(col(_).alias("field%s" % (chr(ord(_) - 6))) for _ in ("ghi"))).alias("level2b")
   ...:     ).alias("_level1")
   ...: )
   ...: 
   ...: df2.printSchema()
   ...: 
   ...: 
root
 |-- _level1: struct (nullable = false)
 |    |-- level2a: struct (nullable = false)
 |    |    |-- fielda: string (nullable = true)
 |    |    |-- fieldb: string (nullable = true)
 |    |    |-- fieldc: string (nullable = true)
 |    |    |-- fieldd: string (nullable = true)
 |    |    |-- fielde: string (nullable = true)
 |    |    |-- fieldf: string (nullable = true)
 |    |-- level2b: struct (nullable = false)
 |    |    |-- fielda: string (nullable = true)
 |    |    |-- fieldb: string (nullable = true)
 |    |    |-- fieldc: string (nullable = true)

There’s a bit of string math going on in here (chr to take the Unicode symbol at a certain index, ord to take a symbol’s code point) to prevent repetition (struct(col("a").alias("fielda"), col("b").alias("fieldb"), …)), but the main message is: use struct to create a new structured colum from other columns.

Oliver W.
  • 13,169
  • 3
  • 37
  • 50