0

I have created a Spark dataset from a csv file.

The schema is:

 |-- FirstName: string (nullable = true)<br>
 |-- LastName: string (nullable = true)<br>
 |-- Email: string (nullable = true)<br>
 |-- Phone: string (nullable = true)

I am performing deduplication on the email field:

Dataset<Row> customer=  spark.read().option("header","true").option("charset","UTF8")
                    .option("delimiter",",").csv(path);

Dataset<Row> distinct =  customer.select(col).distinct();

I would like to create an output csv file with the rows with distinct email Ids.

How to query in order to the retrieve dataset with records with distinct email?

Sample Input:

John David john.david@abc.com 2222
John Smith john.smith@abc.com 4444
John D john.david@abc.com 2222

Sample Output:

John David john.david@abc.com 2222
John Smith john.smith@abc.com 4444

Thanks in advance

abiratsis
  • 7,051
  • 3
  • 28
  • 46
CoolBird
  • 31
  • 5
  • How should you choose between the two names? arbitrarily? also is email the key col? – abiratsis May 13 '19 at 17:20
  • For now I will retain the first record. Now I am able to write a csv file with the distinct email column, But I would like to write the entire row.distinct.coalesce(1).write().format("com.databricks.spark.csv").option("header","true").option("delimiter","\t").save(outPath+"outputs.csv"); – CoolBird May 13 '19 at 17:35

1 Answers1

0

Here is one way using window function.

import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window

val df = Seq(
("John", "David", "john.david@abc.com", 2222),
("John", "Smith", "john.smith@abc.com", 4444),
("John", "D", "john.david@abc.com", 2222)       
).toDF("FirstName", "LastName", "Email", "Phone")

val w = Window.partitionBy($"Email").orderBy($"Phone")

df.withColumn("row", row_number.over(w))
              .where($"row" === 1)
              .drop("row")
              .show(false)

The code will partition by email, then will return the first row for each partition.

Output:

+---------+--------+------------------+-----+
|FirstName|LastName|Email             |Phone|
+---------+--------+------------------+-----+
|John     |Smith   |john.smith@abc.com|4444 |
|John     |David   |john.david@abc.com|2222 |
+---------+--------+------------------+-----+
abiratsis
  • 7,051
  • 3
  • 28
  • 46
  • Thank you so much . I was trying this code, is rownumber.over function deprecated in Java? – CoolBird May 13 '19 at 18:26
  • You welcome, here is a window java example https://stackoverflow.com/questions/33319279/how-to-use-analytic-window-functions-in-spark-java – abiratsis May 13 '19 at 18:32
  • so you can access it with `Column rowNum = functions.row_number().over(w);` importing `org.apache.spark.sql.functions` – abiratsis May 13 '19 at 19:03