1

How to flatten nested arrays with different shapes in PySpark? Here is answered How to flatten nested arrays by merging values in spark with same shape arrays . I'm getting errors described below for arrays with different shapes.

Data-structure:

  • Static names: id, date, val, num (can be hardcoded)
  • Dynamic names: name_1_a , name_10000_xvz(cannot be hardcoded as the data frame has up to 10000 columns/arrays)

Input df:

root
 |-- id: long (nullable = true)
 |-- name_10000_xvz: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- num: long (nullable = true)  **NOTE: additional `num` field **
 |    |    |-- val: long (nullable = true)
 |-- name_1_a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)



df.show(truncate=False)
+---+---------------------------------------------------------------------+---------------------------------+
|id |name_10000_xvz                                                       |name_1_a                         |
+---+---------------------------------------------------------------------+---------------------------------+
|1  |[{2000, null, 30}, {2001, null, 31}, {2002, null, 32}, {2003, 1, 33}]|[{2001, 1}, {2002, 2}, {2003, 3}]|
+---+---------------------------------------------------------------------+---------------------------------+

Required output df:

+---+--------------+----+---+---+
| id|          name|date|val|num|
+---+--------------+----+---+---+
|  1|      name_1_a|2001|  1|   |
|  1|      name_1_a|2002|  2|   |
|  1|      name_1_a|2003|  3|   |
|  1|name_10000_xvz|2000| 30|   |
|  1|name_10000_xvz|2001| 31|   |
|  1|name_10000_xvz|2002| 32|   |
|  1|name_10000_xvz|2003| 33| 1 |
+---+--------------+----+---+---+

Code to reproduce:

NOTE: when i add el.num in TRANSFORM({name}, el -> STRUCT("{name}" AS name, el.date, el.val, el.num I get the error below.

import pyspark.sql.functions as f


df = spark.read.json(
    sc.parallelize(
        [
            """{"id":1,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_10000_xvz":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33, "num":1}]}"""
        ]
    )
).select("id", "name_1_a", "name_10000_xvz")

names = [column for column in df.columns if column.startswith("name_")]

expressions = []
for name in names:
    expressions.append(
        f.expr(
            'TRANSFORM({name}, el -> STRUCT("{name}" AS name, el.date, el.val, el.num))'.format(
                name=name
            )
        )
    )

flatten_df = df.withColumn("flatten", f.flatten(f.array(*expressions))).selectExpr(
    "id", "inline(flatten)"
)

Output:

AnalysisException: No such struct field num in date, Val
Dan
  • 437
  • 7
  • 24
  • 1
    is it normal that "name_1_a " does not contain "num" field ? that is the reason of your exception. The question behing is : when a field value is "missing", is the value set to NULL or is the field totally missing ? – Steven Aug 27 '21 at 09:41
  • Yes , the field num is missing, this is my problem. – Dan Aug 27 '21 at 09:47
  • I can solve the problem in pandas as it is shown in this example https://stackoverflow.com/questions/68941232/how-to-explode-pandas-data-frame-with-json-arrays, but not pyspark . – Dan Aug 27 '21 at 09:49
  • is you input json ? can you show a bit more ? more lines with more column in their original format ? – Steven Aug 27 '21 at 14:59

2 Answers2

1

you need to explode each array individually, use probably an UDF to complete the missing values and unionAll each newly created dataframes. That's for the pyspark part. For the python part, you just need to loop through the different columns and let the magic appen :

from functools import reduce
from pyspark.sql import functions as F, types as T


@F.udf(T.MapType(T.StringType(), T.LongType()))
def add_missing_fields(name_col):
    out = {}
    expected_fields = ["date", "num", "val"]
    for field in expected_fields:
        if field in name_col:
            out[field] = name_col[field]
        else:
            out[field] = None
    return out


flatten_df = reduce(
    lambda a, b: a.unionAll(b),
    (
        df.withColumn(col, F.explode(col))
        .withColumn(col, add_missing_fields(F.col(col)))
        .select(
            "id",
            F.lit(col).alias("name"),
            F.col(col).getItem("date").alias("date"),
            F.col(col).getItem("val").alias("val"),
            F.col(col).getItem("num").alias("num"),
        )
        for col in df.columns
        if col != "id"
    ),
)

here is the result:

flatten_df.show()
+---+--------------+----+---+----+
| id|          name|date|val| num|
+---+--------------+----+---+----+
|  1|      name_1_a|2001|  1|null|
|  1|      name_1_a|2002|  2|null|
|  1|      name_1_a|2003|  3|null|
|  1|name_10000_xvz|2000| 30|null|
|  1|name_10000_xvz|2001| 31|null|
|  1|name_10000_xvz|2002| 32|null|
|  1|name_10000_xvz|2003| 33|   1|
+---+--------------+----+---+----+

Another solution without using unionAll :

c = [col for col in df.columns if col != "id"]

@F.udf(T.ArrayType(T.MapType(T.StringType(), T.LongType())))
def add_missing_fields(name_col):
    out = []
    expected_fields = ["date", "num", "val"]
    for elt in name_col:
        new_map = {}
        for field in expected_fields:
            if field in elt:
                new_map[field] = elt[field]
            else:
                new_map[field] = None
        out.append(new_map)
    return out

df1 = reduce(
    lambda a, b: a.withColumn(
        b, F.struct(F.lit(b).alias("name"), add_missing_fields(b).alias("values"))
    ),
    c,
    df,
)

df2 = (
    df1.withColumn("names", F.explode(F.array(*(F.col(col) for col in c))))
    .withColumn("value", F.explode("names.values"))
    .select(
        "id",
        F.col("names.name").alias("name"),
        F.col("value").getItem("date").alias("date"),
        F.col("value").getItem("val").alias("val"),
        F.col("value").getItem("num").alias("num"),
    )
)

And the result :

df2.show()
+---+--------------+----+---+----+                                              
| id|          name|date|val| num|
+---+--------------+----+---+----+
|  1|      name_1_a|2001|  1|null|
|  1|      name_1_a|2002|  2|null|
|  1|      name_1_a|2003|  3|null|
|  1|name_10000_xvz|2000| 30|null|
|  1|name_10000_xvz|2001| 31|null|
|  1|name_10000_xvz|2002| 32|null|
|  1|name_10000_xvz|2003| 33|   1|
+---+--------------+----+---+----+
Steven
  • 14,048
  • 6
  • 38
  • 73
  • Thanks lot for your help, I was not thinking about this solution, I'll test it out. – Dan Aug 27 '21 at 10:11
  • 1
    @dan sorry, i forgot to apply the "missing_field" function to the dataframe. Here is my edit. – Steven Aug 27 '21 at 10:22
  • @StevenI thanks for the solution, it works well on small datasets but on bigger datasets `unionAll` just gets stack as it is joining large number of data frames. Any idea how to avoid using `unionAll` ? – Dan Aug 27 '21 at 13:34
  • @dan I added another solution without `unionAll`. The best for you would be to fix the schema in input so you do not have to use the UDF. – Steven Aug 27 '21 at 14:59
  • Hi @Steven , I was checking your solution without using unionAll. `df1` is defined by variable `c` which is not defined. Can you help ? – Dan Sep 30 '21 at 14:30
  • 1
    @dan yes, that's right, but I think, it is simply : `c = [col for col in df.columns if col != "id"]`. I edited the code. – Steven Sep 30 '21 at 14:40
-1

You can just used the explode function and then read the individual column values and create additional columns as you required for each of the array column and at the end union both the dataframe to get the required output.

//Sample Data creation and reading
df = spark.read.json(sc.parallelize([
    """{"id":1,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_10000_xvz":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33, "num":1}]}"""
])).select('id','name_1_a', 'name_10000_xvz')
//using explode and creating dataframe for one of the column
import pyspark.sql.functions as f
df1 = df.withColumn("name_1_a_array",f.explode(f.col("name_1_a")))
.select("id","name_1_a_array").withColumn("date",f.col("name_1_a_array.date"))
.withColumn("val",f.col("name_1_a_array.val")).withColumn("num",f.lit("null"))
.drop("name_1_a_array")
//using explode and creating dataframe for second column
df2 = df.withColumn("name_10000_xvz_array",f.explode(f.col("name_10000_xvz")))
.select("id","name_10000_xvz_array").withColumn("date",f.col("name_10000_xvz_array.date"))
.withColumn("val",f.col("name_10000_xvz_array.val")).withColumn("num",f.col("name_10000_xvz_array.num"))
.drop("name_10000_xvz_array")
//union of both the dataframes
df3 = df1.union(df2)
display(df3)

You can see the output as below as it was required:

enter image description here

Nikunj Kakadiya
  • 2,689
  • 2
  • 20
  • 35
  • Thanks for your help. Not sure if I made it clear but array name can be any string and I have them around 10000, so using the solution, I would need to hardcode all names many times... – Dan Aug 27 '21 at 10:13
  • You would have to hardcode that only once for each column and that would be part of the schema so that should not be an issue according to me if I am understanding your question properly – Nikunj Kakadiya Aug 27 '21 at 10:16
  • Apology I will update the question, I have dataframe with 10000 columns `name_1_a` ... `name_10000_xvz`, I will try to make it more clear. – Dan Aug 27 '21 at 10:19
  • It was not me, I prefer to have different answers in this tread, but I assume the reason was that your solution can not be used in my case as I would need to hardcode 10000 times different columns ... – Dan Aug 27 '21 at 10:51