1

I applied an algorithm from the question below(in NOTE) to transpose and explode nested spark dataframe.

When I define cols = ['a', 'b'] I get empty dataframe, but when I define cols = ['a'] I get transformed dataframe for the a column. See section Current code below for more details. Any help would be appreciated.

I'm looking for required output 2 (Transpose and Explode ) but even example of required output 1 (Transpose) will be very useful.

NOTE: This is minimum example to highlight the problem, in reality dataframe schema and arrays length vary as in the example Pyspark: How to flatten nested arrays by merging values in spark

Input df:

+---+------------------+--------+
| id|                 a|       b|
+---+------------------+--------+
|  1|[{1, 1}, {11, 11}]|    null|
|  2|              null|[{2, 2}]|
+---+------------------+--------+


root
 |-- id: long (nullable = true)
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true

Required output 1 (transpose_df):

+---+------+-------------------+
| id| cols |       arrays      |
+---+------+-------------------+
|  1|  a   | [{1, 1}, {11, 11}]|
|  2|  b   | [{2, 2}]          |
+---+------+-------------------+

Required output 2 (explode_df):

+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
|  1|   a|   1|  1|
|  1|   a|  11| 11|
|  2|   b|   2|  2|
+---+----+----+---+

Current code:

import pyspark.sql.functions as f

df = spark.read.json(sc.parallelize([
  """{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
  """{"id":2,"b":[{"date":2,"val":2}]}}"""]))

cols = ['a', 'b']

expressions = [f.expr('TRANSFORM({col}, el -> STRUCT("{col}" AS cols, el.date, el.val))'.format(col=col)) for col in cols ]

transpose_df = df.withColumn('arrays', f.flatten(f.array(*expressions)))
             
explode_df = transpose_df.selectExpr('id', 'inline(arrays)')

explode_df.show()

Current Outcome

+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
+---+----+----+---+
Dan
  • 437
  • 7
  • 24

1 Answers1

1

stack might be a better option than transpose for the first step.


expr = f"stack({len(cols)}," + \
    ",".join([f"'{c}',{c}" for c in cols]) + \
    ")"
#expr = stack(2,'a',a,'b',b)

transpose_df = df.selectExpr("id", expr) \
    .withColumnRenamed("col0", "cols") \
    .withColumnRenamed("col1", "arrays") \
    .filter("not arrays is null")

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
werner
  • 13,518
  • 6
  • 30
  • 45
  • Thanks a lot that's works well for arrays with static fieds (`date`, `val`). I made one more question in case the arrays in columns has dynamic field which change randomly . Any help will be appreciated. https://stackoverflow.com/questions/69423246/spark-how-to-transpose-and-explode-columns-with-dynamic-nested-arrays – Dan Oct 03 '21 at 08:32