1

I have a DataFrame which contains the following details.

|id|Name|Country|version|
|1 |Jack|UK     |new    |
|1 |Jack|USA    |old    |
|2 |Rose|Germany|new    |
|3 |Sam |France |old    |

I would like to create a DataFrame where, if the data is duplicate based on "id" it picks the new version over the old version as so

|id|Name|Country|version|
|1 |Jack|UK     |new    |
|2 |Rose|Germany|new    |
|3 |Sam |France |old    |

What is the best way to do this in Java/Spark, or do I have to use some sort of nested SQL query?

The simplified SQL version would look something like below:

WITH new_version AS (
    SELECT
      ad.id
      ,ad.name
      ,ad.country
      ,ad.version
    FROM allData ad
    WHERE ad.version = 'new'
),
old_version AS (
    SELECT
      ad.id
      ,ad.name
      ,ad.country
      ,ad.version
    FROM allData ad
    LEF JOIN new_version nv on nv.id = ad.id
    WHERE ad.version = 'old'
      AND nv.id is null
),

SELECT id, name, country, version FROM new_version
UNION ALL
SELECT id, name, country, version FROM old_version
Shaido
  • 27,497
  • 23
  • 70
  • 73
Achilles
  • 711
  • 2
  • 13
  • 35

2 Answers2

1

Supposing you have a dataframe as

+---+----+-------+-------+
|id |Name|Country|version|
+---+----+-------+-------+
|1  |Jack|UK     |new    |
|1  |Jack|USA    |old    |
|2  |Rose|Germany|new    |
|3  |Sam |France |old    |
+---+----+-------+-------+

which is created using

val df = Seq(
  ("1","Jack","UK","new"),
  ("1","Jack","USA","old"),
  ("2","Rose","Germany","new"),
  ("3","Sam","France","old")
).toDF("id","Name","Country","version")

You can achieve your requirement of given sql query as removing all the duplucated id rows with old as version column using Window, rank, filter and drop functions as below

import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("id").orderBy("version")
import org.apache.spark.sql.functions._
df.withColumn("rank", rank().over(windowSpec))
  .filter(!(col("version") === "old" && col("rank") > 1))
  .drop("rank")

You should get final dataframe as

+---+----+-------+-------+
|id |Name|Country|version|
+---+----+-------+-------+
|3  |Sam |France |old    |
|1  |Jack|UK     |new    |
|2  |Rose|Germany|new    |
+---+----+-------+-------+
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
1

For older versions of Spark you can use orderBy in combination with groupBy. According to the answer to this question, the order should be kept after a groupBy if the dataframe is sorted after that column. Hence, the following should work (note that the orderBy is on both the id and version columns):

val df2 = df.orderBy("id", "version")
  .groupBy("id")
  .agg(first("Name").as("Name"), first("Country").as("Country"), first("version").as("version"))

This will yield the following result

+---+----+-------+-------+
| id|Name|Country|version|
+---+----+-------+-------+
|  3| Sam| France|    old|
|  1|Jack|     UK|    new|
|  2|Rose|Germany|    new|
+---+----+-------+-------+
Shaido
  • 27,497
  • 23
  • 70
  • 73