0

I have a data frame with schema as below:

root
|-- col1
|-- col2:array (nullable = true)
|   |-- element: array (containsNull = true)
|   |   |-- element: struct (containsNull = true)
|   |   |   |--col3
|   |   |   |--< need to add new column e.g col4 >

How can I add the new column col4 using pyspark (spark version 2.4)

blackbishop
  • 30,945
  • 11
  • 55
  • 76
Sonam
  • 21
  • 3
  • Does this answer your question? [How do I add a column to a nested struct in a PySpark dataframe?](https://stackoverflow.com/questions/48777993/how-do-i-add-a-column-to-a-nested-struct-in-a-pyspark-dataframe) – werner Oct 06 '22 at 13:20
  • This didnot help. In my case, I have nested column of array type and not struct type. – Sonam Oct 06 '22 at 13:34
  • i think one possible answer is to explode the inner lists and make your changes and collect_list again. i cant understand what is this col4 right now and the schema of it. is it a static value for all list in lists ? or is it dynamic ? – Amir Hossein Shahdaei Oct 06 '22 at 15:06
  • @Dawyi col4 is a new column of type String that needs to be added in the position mentioned in my question. – Sonam Oct 06 '22 at 16:02
  • Please provide code to construct a dataset for above schema and also provide a sample of expected dataset. – Azhar Khan Oct 10 '22 at 05:58

1 Answers1

0

You can achieve this using a series of explode() and groupby() operations.

Sample dataset:

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, StructField, StructType

schema = StructType([
  StructField("col1", StringType()),
  StructField("col2", ArrayType(ArrayType(StructType([StructField("col3", StringType())]))))
  ])

root
 |-- col1: string (nullable = true)
 |-- col2: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- col3: string (nullable = true)


data = [("a_col1", [[("p_col3",),("q_col3",)]]),
       ("b_col1", [[("r_col3",),("s_col3",)]])]
df = spark.createDataFrame(data=data, schema=schema)
+------+----------------------+
|col1  |col2                  |
+------+----------------------+
|a_col1|[[{p_col3}, {q_col3}]]|
|b_col1|[[{r_col3}, {s_col3}]]|
+------+----------------------+

Since, there are two nested levels of arrays/list, two expands are required to unroll the data:

df = df.withColumn("col2_outer_expand", F.explode("col2"))
+------+----------------------+--------------------+
|col1  |col2                  |col2_outer_expand   |
+------+----------------------+--------------------+
|a_col1|[[{p_col3}, {q_col3}]]|[{p_col3}, {q_col3}]|
|b_col1|[[{r_col3}, {s_col3}]]|[{r_col3}, {s_col3}]|
+------+----------------------+--------------------+

df = df.withColumn("col2_inner_expand", F.explode("col2_outer_expand"))
+------+----------------------+--------------------+-----------------+
|col1  |col2                  |col2_outer_expand   |col2_inner_expand|
+------+----------------------+--------------------+-----------------+
|a_col1|[[{p_col3}, {q_col3}]]|[{p_col3}, {q_col3}]|{p_col3}         |
|a_col1|[[{p_col3}, {q_col3}]]|[{p_col3}, {q_col3}]|{q_col3}         |
|b_col1|[[{r_col3}, {s_col3}]]|[{r_col3}, {s_col3}]|{r_col3}         |
|b_col1|[[{r_col3}, {s_col3}]]|[{r_col3}, {s_col3}]|{s_col3}         |
+------+----------------------+--------------------+-----------------+

Get col3 field from struct/tuple:

df = df.withColumn("col3", F.col("col2_inner_expand").getField("col3"))
+------+----------------------+--------------------+-----------------+------+
|col1  |col2                  |col2_outer_expand   |col2_inner_expand|col3  |
+------+----------------------+--------------------+-----------------+------+
|a_col1|[[{p_col3}, {q_col3}]]|[{p_col3}, {q_col3}]|{p_col3}         |p_col3|
|a_col1|[[{p_col3}, {q_col3}]]|[{p_col3}, {q_col3}]|{q_col3}         |q_col3|
|b_col1|[[{r_col3}, {s_col3}]]|[{r_col3}, {s_col3}]|{r_col3}         |r_col3|
|b_col1|[[{r_col3}, {s_col3}]]|[{r_col3}, {s_col3}]|{s_col3}         |s_col3|
+------+----------------------+--------------------+-----------------+------+

Create a new col4 column alongside col3:

df = df.withColumn("col4", F.concat(F.substring("col3", 0, 1), F.lit("_col4")))
+------+----------------------+------+------+
|col1  |col2                  |col3  |col4  |
+------+----------------------+------+------+
|a_col1|[[{p_col3}, {q_col3}]]|p_col3|p_col4|
|a_col1|[[{p_col3}, {q_col3}]]|q_col3|q_col4|
|b_col1|[[{r_col3}, {s_col3}]]|r_col3|r_col4|
|b_col1|[[{r_col3}, {s_col3}]]|s_col3|s_col4|
+------+----------------------+------+------+

Do all above operations in reverse now!

Combine col3 and col4 into a struct/tuple:

df = df.withColumn("col2_inner_group", F.struct("col3", "col4"))
+------+----------------------+------+------+----------------+
|col1  |col2                  |col3  |col4  |col2_inner_group|
+------+----------------------+------+------+----------------+
|a_col1|[[{p_col3}, {q_col3}]]|p_col3|p_col4|{p_col3, p_col4}|
|a_col1|[[{p_col3}, {q_col3}]]|q_col3|q_col4|{q_col3, q_col4}|
|b_col1|[[{r_col3}, {s_col3}]]|r_col3|r_col4|{r_col3, r_col4}|
|b_col1|[[{r_col3}, {s_col3}]]|s_col3|s_col4|{s_col3, s_col4}|
+------+----------------------+------+------+----------------+

Roll two times as there are two nested levels.

df = df.groupby("col1").agg(F.collect_list("col2_inner_group")).withColumnRenamed("collect_list(col2_inner_group)", "col2_outer_group")
+------+------------------------------------+
|col1  |col2_outer_group                    |
+------+------------------------------------+
|a_col1|[{p_col3, p_col4}, {q_col3, q_col4}]|
|b_col1|[{r_col3, r_col4}, {s_col3, s_col4}]|
+------+------------------------------------+

df = df.groupby("col1").agg(F.collect_list("col2_outer_group")).withColumnRenamed("collect_list(col2_outer_group)", "col2")
+------+--------------------------------------+
|col1  |col2                                  |
+------+--------------------------------------+
|a_col1|[[{p_col3, p_col4}, {q_col3, q_col4}]]|
|b_col1|[[{r_col3, r_col4}, {s_col3, s_col4}]]|
+------+--------------------------------------+

And finally you've the new column added at the desired place.

Azhar Khan
  • 3,829
  • 11
  • 26
  • 32