1

I'm trying to convert list of dataframes into single dataframe which given below
Where dfList is List[sql.Dataframe]

dfList=List([ID: bigint, A: string], [ID: bigint, B: string], [ID: bigint, C: string], [ID: bigint, D: string])

dfList = List( +--------+-------------+  +--------+-------------+ +--------+--------+ +--------+--------+
               |    ID  |     A       |   ID      |     B       | |   ID   |     C  | |   ID   |   D    |
               +--------+-------------+  +--------+-------------+ +--------+--------+ +--------+--------+
               |    9574|            F|  |    9574|       005912| |    9574| 2016022| |    9574|      VD|
               |    9576|            F|  |    9576|       005912| |    9576| 2016022| |    9576|      VD|
               |    9578|            F|  |    9578|       005912| |    9578| 2016022| |    9578|      VD|
               |    9580|            F|  |    9580|       005912| |    9580| 2016022| |    9580|      VD|
               |    9582|            F|  |    9582|       005912| |    9582| 2016022| |    9582|      VD|
               +--------+-------------+, +--------+-------------+,+--------+--------+,+--------+--------+ )

Excepted Output

+--------+-------------+----------+--------+-------+
|   ID   |     A       |      B   |  C     |  D    |
+--------+-------------+----------+--------+-------+
|    9574|            F|    005912| 2016022|     00|
|    9576|            F|    005912| 2016022|     01|
|    9578|            F|    005912| 2016022|     20|
|    9580|            F|    005912| 2016022|     19|
|    9582|            F|    005912| 2016022|     89|
+--------+-------------+----------+--------+-------+
the hacker
  • 117
  • 1
  • 10

2 Answers2

4

You will want to use foldLeft with a join.

Generating Data

scala> val dfList = ('a' to 'd').map(col => (1 to 5).zip(col.toInt to col.toInt + 4).toDF("ID", col.toString)).toList
dfList: List[org.apache.spark.sql.DataFrame] = List([ID: int, a: int], [ID: int, b: int], [ID: int, c: int], [ID: int, d: int])

Which gives me the following DataFrames:

+---+---+   +---+---+   +---+---+   +---+---+
| ID|  a|   | ID|  b|   | ID|  c|   | ID|  d|
+---+---+   +---+---+   +---+---+   +---+---+
|  1| 97|   |  1| 98|   |  1| 99|   |  1|100|
|  2| 98|   |  2| 99|   |  2|100|   |  2|101|
|  3| 99|   |  3|100|   |  3|101|   |  3|102|
|  4|100|   |  4|101|   |  4|102|   |  4|103|
|  5|101|   |  5|102|   |  5|103|   |  5|104|
+---+---+   +---+---+   +---+---+   +---+---+

Joining DataFrames

scala> val joinedDF = dfList.tail.foldLeft(dfList.head)((accDF, newDF) => accDF.join(newDF, Seq("ID")))
joinedDF: org.apache.spark.sql.DataFrame = [ID: int, a: int ... 3 more fields]

scala> joinedDF.show
+---+---+---+---+---+
| ID|  a|  b|  c|  d|
+---+---+---+---+---+
|  1| 97| 98| 99|100|
|  2| 98| 99|100|101|
|  3| 99|100|101|102|
|  4|100|101|102|103|
|  5|101|102|103|104|
+---+---+---+---+---+

In Scala a fold is a method of reducing a collection down to a single element. In this case we are starting with the head of the list (dfList.head) and then joining each element in the tail of the list (dfList.tail) together to get one final DataFrame. accDF is the accumulating DataFrame (gets passed from "iteration" to "iteration") and then newDF is the next or new DataFrame to be added.

For more examples on how fold works see here or here.

evan.oman
  • 5,922
  • 22
  • 43
4

@evan058 provided an effective solution, but I would like to add that reduce might be a better choice for parallelized operations:

val joinedDF = dfList.reduce((accDF, nextDF) => accDF.join(nextDF, Seq("ID")))
Community
  • 1
  • 1
Leo C
  • 22,006
  • 3
  • 26
  • 39