1

I have a dataframe containing an array of rows on each row

I want to aggregate all the inner rows into one dataframe

Below is what I have / achieved:

This

df.select('*').take(1)

Gives me this:

[
   Row(
       body=[
               Row(a=1, b=1), 
               Row(a=2, b=2)
            ]
      )
]

So doing this:

df.rdd.flatMap(lambda x: x).collect()

I get this:

[[
   Row(a=1, b=1)
   Row(a=2, b=2)
]]

So I am forced to do this:

df.rdd.flatMap(lambda x: x).flatMap(lambda x: x)

So I can achieve the below:

[
  Row(a=1, b=1) 
  Row(a=2, b=2)
]

Using the result above, I can finally convert it to a dataframe and save somewhere. Which is what I want. But calling flatMap twice doesnt look right.

I tried to the same by using Reduce, just like the following code:

flatRdd = df.rdd.flatMap(lambda x: x)        
dfMerged = reduce(DataFrame.unionByName, [flatRdd])

The second argument of reduce must be iterable, so I was forced to add [flatRdd]. Sadly it gives me this:

[[
   Row(a=1, b=1)
   Row(a=2, b=2)
]]

There is certainlly a better way to achieve what I want.

Flavio Pegas
  • 388
  • 1
  • 9
  • 26

2 Answers2

1

IIUC, you can explode and then flatten the resulting Rows using the .* syntax.

Suppose you start with the following DataFrame:

df.show()
#+----------------+
#|            body|
#+----------------+
#|[[1, 1], [2, 2]]|
#+----------------+

with the schema:

df.printSchema()
#root
# |-- body: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- a: long (nullable = true)
# |    |    |-- b: long (nullable = true)

You can first explode the body column:

from pyspark.sql.functions import explode
df = df.select(explode("body").alias("exploded"))
df.show()
#+--------+
#|exploded|
#+--------+
#|  [1, 1]|
#|  [2, 2]|
#+--------+

Now flatten the exploded column:

df = df.select("exploded.*")
df.show()
#+---+---+
#|  a|  b|
#+---+---+
#|  1|  1|
#|  2|  2|
#+---+---+

Now if you were to call collect, you'd get the desired output:

print(df.collect())
#[Row(a=1, b=1), Row(a=2, b=2)]

See also:

pault
  • 41,343
  • 15
  • 107
  • 149
  • Thanks man! This is actually very good, explod returned a typed dataframe to me, based on the rows I haave inside the body. With flatMap, I had to use toDF(schema). – Flavio Pegas Jul 15 '19 at 19:45
1

You don't need to run flatMap() on the Row object, just refer it directly with the key:

>>> df.rdd.flatMap(lambda x: x.body).collect()
[Row(a=1, b=1), Row(a=2, b=2)]
jxc
  • 13,553
  • 4
  • 16
  • 34