-1

Got multiple data files that belong to different weeks - all files of same format. I need to consolidate the files using scala code that runs on Spark. End result should be only unique records by key, also end result should keep the record from latest file for same key fields.

Each data file can potentially have close to 1/2 Billion records and hence the code has to be high performing one...

Example:

Latest data file

CID PID Metric
C1  P1  10
C2  P1  20
C2  P2  30

Previous data File

CID PID Metric
C1  P1  20
C2  P1  30
C3  P1  40
C3  P2  50

Oldest data File

CID PID Metric
C1  P1  30
C2  P1  40
C3  P1  50
C3  P2  60
C4  P1  30

Output file expectation

C1  P1  10
C2  P1  20
C2  P2  30
C3  P1  40
C3  P2  50
C4  P1  30
zero323
  • 322,348
  • 103
  • 959
  • 935

1 Answers1

1

Age column can be assigned to each Dataframe, then Dataframes united in one, and then Window function used:

// data preparation
val columnNames = List("CID", "PID", "Metric")
val latest = List(
  ("C1", "P1", 10),
  ("C2", "P1", 20),
  ("C2", "P2", 30)
).toDF(columnNames: _*)

val previous = List(
  ("C1", "P1", 20),
  ("C2", "P1", 30),
  ("C3", "P1", 40),
  ("C3", "P2", 50)
).toDF(columnNames: _*)

val oldest = List(
  ("C1", "P1", 30),
  ("C2", "P1", 40),
  ("C3", "P1", 50),
  ("C3", "P2", 60),
  ("C4", "P1", 30)
).toDF(columnNames: _*)
// \ data preparation

val dfList = List(oldest, previous, latest)
val dfListWithIndexColumn = dfList.zipWithIndex.map { case (df, index) => df.withColumn("age", lit(index)) }
val unitedDF = dfListWithIndexColumn.reduce(_ union _)

val cidPidWindow = Window.partitionBy("CID", "PID").orderBy($"age".desc)
val result = unitedDF
  .withColumn("rank", rank.over(cidPidWindow))
  .where($"rank" === 1)
  .drop("age", "rank")

result.show(false)

Output:

+---+---+------+
|CID|PID|Metric|
+---+---+------+
|C1 |P1 |10    |
|C2 |P1 |20    |
|C2 |P2 |30    |
|C3 |P1 |40    |
|C3 |P2 |50    |
|C4 |P1 |30    |
+---+---+------+
pasha701
  • 6,831
  • 1
  • 15
  • 22