You can achieve this using a series of explode()
and groupby()
operations.
Sample dataset:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, StructField, StructType
schema = StructType([
StructField("col1", StringType()),
StructField("col2", ArrayType(ArrayType(StructType([StructField("col3", StringType())]))))
])
root
|-- col1: string (nullable = true)
|-- col2: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: struct (containsNull = true)
| | | |-- col3: string (nullable = true)
data = [("a_col1", [[("p_col3",),("q_col3",)]]),
("b_col1", [[("r_col3",),("s_col3",)]])]
df = spark.createDataFrame(data=data, schema=schema)
+------+----------------------+
|col1 |col2 |
+------+----------------------+
|a_col1|[[{p_col3}, {q_col3}]]|
|b_col1|[[{r_col3}, {s_col3}]]|
+------+----------------------+
Since, there are two nested levels of arrays/list, two expands are required to unroll the data:
df = df.withColumn("col2_outer_expand", F.explode("col2"))
+------+----------------------+--------------------+
|col1 |col2 |col2_outer_expand |
+------+----------------------+--------------------+
|a_col1|[[{p_col3}, {q_col3}]]|[{p_col3}, {q_col3}]|
|b_col1|[[{r_col3}, {s_col3}]]|[{r_col3}, {s_col3}]|
+------+----------------------+--------------------+
df = df.withColumn("col2_inner_expand", F.explode("col2_outer_expand"))
+------+----------------------+--------------------+-----------------+
|col1 |col2 |col2_outer_expand |col2_inner_expand|
+------+----------------------+--------------------+-----------------+
|a_col1|[[{p_col3}, {q_col3}]]|[{p_col3}, {q_col3}]|{p_col3} |
|a_col1|[[{p_col3}, {q_col3}]]|[{p_col3}, {q_col3}]|{q_col3} |
|b_col1|[[{r_col3}, {s_col3}]]|[{r_col3}, {s_col3}]|{r_col3} |
|b_col1|[[{r_col3}, {s_col3}]]|[{r_col3}, {s_col3}]|{s_col3} |
+------+----------------------+--------------------+-----------------+
Get col3
field from struct/tuple:
df = df.withColumn("col3", F.col("col2_inner_expand").getField("col3"))
+------+----------------------+--------------------+-----------------+------+
|col1 |col2 |col2_outer_expand |col2_inner_expand|col3 |
+------+----------------------+--------------------+-----------------+------+
|a_col1|[[{p_col3}, {q_col3}]]|[{p_col3}, {q_col3}]|{p_col3} |p_col3|
|a_col1|[[{p_col3}, {q_col3}]]|[{p_col3}, {q_col3}]|{q_col3} |q_col3|
|b_col1|[[{r_col3}, {s_col3}]]|[{r_col3}, {s_col3}]|{r_col3} |r_col3|
|b_col1|[[{r_col3}, {s_col3}]]|[{r_col3}, {s_col3}]|{s_col3} |s_col3|
+------+----------------------+--------------------+-----------------+------+
Create a new col4
column alongside col3
:
df = df.withColumn("col4", F.concat(F.substring("col3", 0, 1), F.lit("_col4")))
+------+----------------------+------+------+
|col1 |col2 |col3 |col4 |
+------+----------------------+------+------+
|a_col1|[[{p_col3}, {q_col3}]]|p_col3|p_col4|
|a_col1|[[{p_col3}, {q_col3}]]|q_col3|q_col4|
|b_col1|[[{r_col3}, {s_col3}]]|r_col3|r_col4|
|b_col1|[[{r_col3}, {s_col3}]]|s_col3|s_col4|
+------+----------------------+------+------+
Do all above operations in reverse now!
Combine col3
and col4
into a struct/tuple:
df = df.withColumn("col2_inner_group", F.struct("col3", "col4"))
+------+----------------------+------+------+----------------+
|col1 |col2 |col3 |col4 |col2_inner_group|
+------+----------------------+------+------+----------------+
|a_col1|[[{p_col3}, {q_col3}]]|p_col3|p_col4|{p_col3, p_col4}|
|a_col1|[[{p_col3}, {q_col3}]]|q_col3|q_col4|{q_col3, q_col4}|
|b_col1|[[{r_col3}, {s_col3}]]|r_col3|r_col4|{r_col3, r_col4}|
|b_col1|[[{r_col3}, {s_col3}]]|s_col3|s_col4|{s_col3, s_col4}|
+------+----------------------+------+------+----------------+
Roll two times as there are two nested levels.
df = df.groupby("col1").agg(F.collect_list("col2_inner_group")).withColumnRenamed("collect_list(col2_inner_group)", "col2_outer_group")
+------+------------------------------------+
|col1 |col2_outer_group |
+------+------------------------------------+
|a_col1|[{p_col3, p_col4}, {q_col3, q_col4}]|
|b_col1|[{r_col3, r_col4}, {s_col3, s_col4}]|
+------+------------------------------------+
df = df.groupby("col1").agg(F.collect_list("col2_outer_group")).withColumnRenamed("collect_list(col2_outer_group)", "col2")
+------+--------------------------------------+
|col1 |col2 |
+------+--------------------------------------+
|a_col1|[[{p_col3, p_col4}, {q_col3, q_col4}]]|
|b_col1|[[{r_col3, r_col4}, {s_col3, s_col4}]]|
+------+--------------------------------------+
And finally you've the new column added at the desired place.