1

I have the following, nasty formatted, input data frame:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.master("local").getOrCreate()

input_df = spark.createDataFrame(
    [
        ('Alice;Bob;Carol',),
        ('12;13;14',),
        ('5;;7',),
        ('1;;3',),
        (';;3',)
    ],
    ['data']
)
  
input_df.show()

# +---------------+
# |           data|
# +---------------+
# |Alice;Bob;Carol|
# |       12;13;14|
# |           5;;7|
# |           1;;3|
# |            ;;3|
# +---------------+

The actual input is a semicolon-separated CSV file, with one column containing the values for one person. Each person can have a different number of values. Here, Alice has 3 values, Bob has only one, and Carol has four values.

I would like to transform it within PySpark to an output data frame that holds an array per person, in this example the output would be:

result = spark.createDataFrame(
    [
        ("Alice", [12, 5, 1]),
        ("Bob", [13,]),
        ("Carol", [14, 7, 3, 3])
    ],
    ['name', 'values']
)

result.show()

# +-----+-------------+
# | name|       values|
# +-----+-------------+
# |Alice|   [12, 5, 1]|
# |  Bob|         [13]|
# |Carol|[14, 7, 3, 3]|
# +-----+-------------+

How would I do this? I'm thinking it will be some combination of F.arrays_zip(), F.split() and/or F.explode(), but I can't figure it out.

I'm currently stuck here, this is my attempt as of now:

(input_df
    .withColumn('splits', F.split(F.col('data'), ';'))
    .drop('data')
).show()

# +-------------------+
# |             splits|
# +-------------------+
# |[Alice, Bob, Carol]|
# |       [12, 13, 14]|
# |           [5, , 7]|
# |           [1, , 3]|
# |            [, , 3]|
# +-------------------+
notNull
  • 30,258
  • 4
  • 35
  • 50
Alexander Engelhardt
  • 1,632
  • 3
  • 16
  • 31

3 Answers3

1

One approach can be by reading the first line as header then unpivot the data

df1 = spark.createDataFrame([(12,13,14),(5,None,7),(1,None,3),(None,None,3)], ['Alice','Bob','Carol'])

df1.show()
+-----+----+-----+
|Alice| Bob|Carol|
+-----+----+-----+
|   12|  13|   14|
|    5|null|    7|
|    1|null|    3|
| null|null|    3|
+-----+----+-----+

df1.select(f.expr('''stack(3,'Alice',Alice,'Bob',Bob,'Carol',Carol) as (Name,Value)'''))\
   .groupBy('Name').agg(f.collect_list('value').alias('Value')).orderBy('Name').show()

+-----+-------------+
| Name|        Value|
+-----+-------------+
|Alice|   [12, 5, 1]|
|  Bob|         [13]|
|Carol|[14, 7, 3, 3]|
+-----+-------------+

For dynamically passing the columns use below code

cols = ','.join([f"'{i[0]}',{i[1]}" for i in zip(df1.columns,df1.columns)])
df1.select(f.expr(f'''stack(3,{cols}) as (Name,Value)''')).groupBy('Name').agg(f.collect_list('value').alias('Value')).orderBy('Name').show()

+-----+-------------+
| Name|        Value|
+-----+-------------+
|Alice|   [12, 5, 1]|
|  Bob|         [13]|
|Carol|[14, 7, 3, 3]|
+-----+-------------+
Shubham Jain
  • 5,327
  • 2
  • 15
  • 38
0

Solution for Spark-2.4+:

Use groupBy to get all the rows into one row using collect_list and then split to create a new column.

  • use arrays_zip to zip the arrays and create nested array [key,[values]]
  • finally explode the nested array.

Example:

df.show()
#+---------------+
#|           data|
#+---------------+
#|Alice;Bob;Carol|
#|       12;13;14|
#|           5;;7|
#|           1;;3|
#|            ;;3|
#+---------------+
from pyspark.sql.functions import *

df.agg(split(concat_ws("|",collect_list(col("data"))),"\\|").alias("tmp")).\
withColumn("col1",split(element_at(col("tmp"),1),";")).\
withColumn("col2",split(element_at(col("tmp"),2),";")).\
withColumn("col3",split(element_at(col("tmp"),3),";")).\
withColumn("col4",split(element_at(col("tmp"),4),";")).\
withColumn("zip",arrays_zip(col("col1"),arrays_zip(col("col2"),col("col3"),col("col4")))).\
selectExpr("explode(zip)as tmp").\
selectExpr("tmp.*").\
toDF("name","values").\
show(10,False)

#+-----+----------+
#|name |values    |
#+-----+----------+
#|Alice|[12, 5, 1]|
#|Bob  |[13, , ]  |
#|Carol|[14, 7, 3]|
#+-----+----------+

For spark < 2.4 use udf for arrays_zip and use getItem(<n>) instead of element_at function.

notNull
  • 30,258
  • 4
  • 35
  • 50
  • Thanks very much! In my original data set, I have hundreds of columns, i.e. hundreds of names like Alice, Bob, and Carol. Is there a way to loop all the rows that create "col1", "col2" etc. in your answer? – Alexander Engelhardt Jun 23 '20 at 15:08
  • @AlexanderEngelhardt, For dynamically create **element_at** columns you can use the similar method as mentioned here: https://stackoverflow.com/questions/61757408/pyspark-adding-columns-from-a-list https://stackoverflow.com/questions/48134478/how-to-add-columns-in-pyspark-dataframe-dynamically – notNull Jun 23 '20 at 15:50
0

I would suggest to read the data as ; separeted csv and then process to get name and values columns as below-

Please note that this code is written in scala but similar code can be implemented in pyspark with minimal change

Load the ; separated csv

   val data =
      """
        |Alice;Bob;Carol
        |       12;13;14
        |           5;;7
        |           1;;3
        |            ;;3
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\;").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)
    df.printSchema()
    df.show(false)
    /**
      * root
      * |-- Alice: integer (nullable = true)
      * |-- Bob: integer (nullable = true)
      * |-- Carol: integer (nullable = true)
      *
      * +-----+----+-----+
      * |Alice|Bob |Carol|
      * +-----+----+-----+
      * |12   |13  |14   |
      * |5    |null|7    |
      * |1    |null|3    |
      * |null |null|3    |
      * +-----+----+-----+
      */

derive name and values column


    val columns = df.columns.map(c => expr(s"named_struct('name', '$c', 'values',  collect_list($c))"))
    df.select(array(columns: _*).as("array"))
      .selectExpr("inline_outer(array)")
      .show(false)
    /**
      * +-----+-------------+
      * |name |values       |
      * +-----+-------------+
      * |Alice|[12, 5, 1]   |
      * |Bob  |[13]         |
      * |Carol|[14, 7, 3, 3]|
      * +-----+-------------+
      */
Som
  • 6,193
  • 1
  • 11
  • 22