1

I have problem similar to this but I want to check for duplicates in multiple columns and keep the record with the oldest timestamp.

I tried to create a timestamp column order with this and then drop Duplicates (Drop Duplicates will keep the first record and delete the next ones) so this works.

from pyspark.sql.functions import unix_timestamp
...

pattern = "yyyy-MM-dd hh:mm:ss"

#Valid from is the timestamp column
#Extract time from the field and order ascending
df= df.withColumn("timestampCol", unix_timestamp(df["valid_from"], pattern).\
                  cast("timestamp")).\
                  orderBy(["timestampCol"],ascending = True)

#Drop duplicates based on all column except timestamps so only the older 
#timestamps stay
df = df.dropDuplicates(subset= [x for x in df.columns if x not in ["valid_from", "timestampCol"]])

This code works fine for small datasets. But when I try to use a bigger dataset I have severe performance issues. I found out that dropDuplicates() after orderBy() has atrocious performance. I tried to cache the dataframe but no much progress was made.

The problem is when drop duplicates starts I take at the console

[Stage x:=============================> (1 + 1) / 2]

And it stacks there for almost 20 minutes.

So my question is this:

  1. Why dropDuplicates() after orderBy() has so bad performance? Is there another way to achieve the same goal (Drop Duplicates on multiple columns while keeping the older value?

  2. Does the console output means that only 2 executors are running at that time? And if so how can I increase them? I submit my application in YARN with: --num-executors 5 --executor-cores 5 --executor-memory 20G . Why on this particular point I have only two executors running and how can I increase them for this step?

zero323
  • 322,348
  • 103
  • 959
  • 935
Michail N
  • 3,647
  • 2
  • 32
  • 51
  • This code is so slow because it has to shuffle all data twice. Also, it is not correct. There is no explicit guarantee that the oldest record will be preserved (nothing in the internal implementation ensures that - check comments and links [here](https://stackoverflow.com/q/33878370/6910411)). – zero323 Apr 25 '18 at 11:42
  • 1
    I don't understand why I am duplicate of the other question, Can you clarify I little more? How should I do it? – Michail N Apr 25 '18 at 11:43
  • 1
    "Drop later records" is the same as "take the oldest one". Define the order (on `"timestampCol"`), define grouping (columns you use use as `subset` for `dropDuplicates`), choose the first row according to grouping - all there is to do here. `max` / `min` with `structs` usually scales best, independent of the data properties. And like mentioned above - `orderBy` with `dropDuplicates` is same `orderBy` with `groupBy` with `first` which is not guaranteed to work. – zero323 Apr 25 '18 at 11:47

0 Answers0