2

I use: Python 3.6 and PySpark 2.3.0. In the following exaple I have only tow items in item but also I can have more information like first_name, last_name, city.

I have a data frame with the following schema:

|-- email: string (nullable = true)
| -- item: struct(nullable=true)
| | -- item: array(nullable=true)
| | | -- element: struct(containsNull=true)
| | | | -- data: string(nullable=true)
| | | | -- fieldid: string(nullable=true)
| | | | -- fieldname: string(nullable=true)
| | | | -- fieldtype: string(nullable=true)

This is my output:

+-----+-----------------------------------------------------------------------------------------+
|email|item                                                                                     |
+-----+-----------------------------------------------------------------------------------------+
|x    |[[[Gmail, 32, Email Client, dropdown], [Device uses Proxy Server, 33, Device, dropdown]]]|
|y    |[[[IE, 32, Email Client, dropdown], [Personal computer, 33, Device, dropdown]]]          |
+-----+-----------------------------------------------------------------------------------------+

I want to transform this data frame to:

+-----+-------------------------------------+
|email|Email Client|Device                  |
+-----+-------------------------------------+
|x    |Gmail       |Device uses Proxy Server|
|y    |IE          |Personal computer       |
+-----+-------------------------------------+

I do some transformations:

df = df.withColumn('item', df.item.item)
df = df.withColumn('column_names', df.item.fieldname)
df = df.withColumn('column_values', df.item.data)

And now my output is:

+-----+----------------------+---------------------------------+
|email|column_names          |column_values                    |
+-----+----------------------+---------------------------------+
|x    |[Email Client, Device]|[Gmail, Device uses Proxy Server]|
|y    |[Email Client, Device]|[IE, Personal computer]          |
+-----+----------------------+---------------------------------+

From here I want a method how to zip these columns.

Sn0pY
  • 347
  • 4
  • 16

1 Answers1

4

You asked how to zip the arrays, but you can actually get to your desired output without the intermediate steps of creating the column_names and column_values columns.

Use the getItem() function to grab the desired values by index:

import pyspark.sql.functions as f
df = df.select(
    'email',
    f.col('item.data').getItem(0).alias('Email Client'),
    f.col('item.data').getItem(1).alias('Device')
)
df.show(truncate=False)
#+-----+------------+------------------------+
#|email|Email Client|Device                  |
#+-----+------------+------------------------+
#|x    |Gmail       |Device uses Proxy Server|
#|y    |IE          |Personal computer       |
#+-----+------------+------------------------+

This assumes that the Email Client field is always at index 0 and Device is at index 1.


If you can't assume that the fields are always in the same order in each row, another option is to create a map from the values in the column_names and column_values using pyspark.sql.functions.create_map().

This function takes takes a:

list of column names (string) or list of Column expressions that [are] grouped as key-value pairs, e.g. (key1, value1, key2, value2, ...).

We iterate over the items in column_names and column_values to create a list of the pairs, and then use list(chain.from_iterable(...)) to flatten the list.

After the list is made, you can select the field by name.

from itertools import chain

# first create a map type column called 'map'
df.select(
    'email',
    f.create_map(
        list(
            chain.from_iterable(
                [[f.col('column_names').getItem(i), f.col('column_values').getItem(i)] 
                 for i in range(2)]
            )
        )
    ).alias('map')
)
df.show(truncte=False)
#+-----+--------------------------------------------------------------+
#|email|map                                                           |
#+-----+--------------------------------------------------------------+
#|x    |Map(Email Client -> Gmail, Device -> Device uses Proxy Server)|
#|y    |Map(Email Client -> IE, Device -> Personal computer)          |
#+-----+--------------------------------------------------------------+

# now select the fields by key
df = df.select(
    'email',
    f.col('map').getField("Email Client").alias("Email Client"),
    f.col('map').getField("Device").alias("Device")
)

This assumes that there will always be at least 2 elements in each array.


If you wanted to zip lists of arbitrary length, you would have to use a udf.

# define the udf
zip_lists = f.udf(lambda x, y: [list(z) for z in zip(x, y)], ArrayType(StringType()))

# use the udf to zip the lists
df.select(
    'email',
    zip_lists(f.col('column_names'), f.col('column_values')).alias('zipped')
).show(truncate=False)
#+-----+-----------------------------------------------------------+
#|email|zipped                                                     |
#+-----+-----------------------------------------------------------+
#|x    |[[Email Client, Gmail], [Device, Device uses Proxy Server]]|
#|y    |[[Email Client, IE], [Device, Personal computer]]          |
#+-----+-----------------------------------------------------------+

Or you could use a udf to create the map:

make_map = f.udf(lambda x, y: dict(zip(x, y)), MapType(StringType(), StringType()))
df.select(
    'email',
    make_map(f.col('column_names'), f.col('column_values')).alias('map')
).show(truncate=False)
#+-----+--------------------------------------------------------------+
#|email|map                                                           |
#+-----+--------------------------------------------------------------+
#|x    |Map(Device -> Device uses Proxy Server, Email Client -> Gmail)|
#|y    |Map(Device -> Personal computer, Email Client -> IE)          |
#+-----+--------------------------------------------------------------+
pault
  • 41,343
  • 15
  • 107
  • 149