0

I know this question have been similarly asked here but nobody has raised up the case when one or both dataframes in the reduce are empty. Performing an inner join the final result in that case will be always empty

If I have:

dfList = List( 
+--------+-------------+  +--------+-------------+ +--------+--------+ +--------+--------+
|    ID  |     A       |   ID      |     B       | |   ID   |     C  | |   ID   |   D    |
+--------+-------------+  +--------+-------------+ +--------+--------+ +--------+--------+
|    9574|            F|  |    9574|       005912| |    9574| 2016022| |    9574|      VD|
|    9576|            F|  |    9576|       005912| |    9576| 2016022| |    9576|      VD|
|    9578|            F|  |    9578|       005912| |    9578| 2016022| |    9578|      VD|
|    9580|            F|  |    9580|       005912| |    9580| 2016022| |    9580|      VD|
|    9582|            F|  |    9582|       005912| |    9582| 2016022| |    9582|      VD|
+--------+-------------+, +--------+-------------+,+--------+--------+,+--------+--------+ 
)

and I want to reduce this list into a single dataframe, I could easily do:

listDataFrames.reduce(
            (df1, df2) =>
              df1.join(df2, Seq("ID")).localCheckpoint(true)
          )

Whatever the join is (inner, left, or right), if one of the first couple of dataframes is empty, the final result will be empty.

One way could be:

listDataFrames.filter(!_.rdd.isEmpty())

but it takes a lot of time, so it is not too good in terms of performance. Do you have some suggestion?

mazaneicha
  • 8,794
  • 4
  • 33
  • 52
Federico Rizzo
  • 183
  • 2
  • 9
  • Inside your reduce function you can both check for the emptiness and do the operation, but I don’t know if this actually affects performance much! Something like {if (df2.isEmpty) df1 else df1.join(df2, …)} – AminMal May 20 '22 at 11:37
  • yes I know but that is the problem. It takes too much time – Federico Rizzo May 20 '22 at 12:10

1 Answers1

0

If you are on Spark 3.1 you could actually use a unionByName & group by

df1.unionByName(df2, allowMissingColumns=True).grouby("ID","A","B","C","D")

In other version you can do something similar but I haven't tested the performance: (And there's a catch it only works on numeric fields, solikely you'd have to do some translation back and forth.)

df2.select(
  df2.ID, 
  df2.B.alias("col1"), 
  f.lit("B").alias("table"))
.union(
  df1.select(
    df1.ID, 
    df1.A.alias("col1"), 
    f.lit("A")))
.groupBy("ID")
.pivot("table")
.max("col1")
.show()
Matt Andruff
  • 4,974
  • 1
  • 5
  • 21