7

I'm using spark to load json files from Amazon S3. I would like to remove duplicates based on two columns of the data frame retaining the newest(I have timestamp column). What would be the best way to do it? Please note that the duplicates may be spread across partitions. Can I remove duplicates retaining the last record without shuffling? I'm dealing with 1 TB of data.

I was thinking of partitioning the data frame by those two columns in such way that all duplicate records will be "consistently hashed" into the same partition and thus a partition level sort followed be drop duplicates will eliminate all duplicates keeping just one. I dunno if it's possible. Any information is appreciated.

lalatnayak
  • 160
  • 1
  • 6
  • 21
  • [pyspark remove duplicates from dataframe keeping the last appearance](https://stackoverflow.com/questions/53284881/pyspark-remove-duplicates-from-dataframe-keeping-the-last-appearance) – Prathik Kini Apr 15 '19 at 07:45

1 Answers1

6

Use row_number() Window function is probably easier for your task, below c1 is the timestamp column, c2, c3 are columns used to partition your data:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# set rn with F.row_number() and filter the result by rn == 1
df_new = df.withColumn('rn', F.row_number().over(win)).where('rn = 1').drop('rn')
df_new.show()

Edit:

If you just need the duplicates and drop unique rows, then add another field:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# window to cover all rows in the same partition
win2 = Window.partitionBy('c2', 'c3') \
             .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# set new columns: rn, cnt and filter the result by rn == 1 and cnt > 1
df_new = df.withColumn('rn', F.row_number().over(win)) \
           .withColumn('cnt', F.count('c1').over(win2)) \
           .where('rn = 1 and cnt > 1') \
           .drop('rn', 'cnt')
df_new.show()
jxc
  • 13,553
  • 4
  • 16
  • 34
  • I removed my answer since this one will be much more efficient without join! – abiratsis Apr 16 '19 at 11:04
  • What about this? ` window = Window.partitionBy(partition_columns).orderBy(F.desc(sort_key))` `Window.partitionBy(partition_columns).orderBy(F.asc(sort_key))` `data_frame = data_frame.withColumn('rank', F.rank().over(window))\` `.withColumn('row_number', F.row_number().over(window))\` `.filter((F.col('rank') == 1) & (F.col('row_number') == 1)).drop('rank', 'row_number')` – lalatnayak Apr 16 '19 at 11:57
  • 1
    @lalatnayak are you using the same window spec for rank() and row_number()? I think they are returning the same values by the default window range. – jxc Apr 16 '19 at 12:15
  • Thanks for pointing out. Actually the rank will suffice for me I think. Quick question. What impact does a filter and union (with a data frame with a different scheme)have on partitions. – lalatnayak Apr 16 '19 at 20:17
  • @lalatnayak, don't know why you want to union dataframe with different schemes. did you mean `join`? I don't think `filter` and `union` have a direct impact on partitions. it really depends on the method chain. i.e. union(..) followed by a distinct(). you might try `explain()` and see if partition or shuffling are involved in your transformations. I am not an expert in this part, so above is just my 2 cents. – jxc Apr 17 '19 at 13:50