I have problem similar to this but I want to check for duplicates in multiple columns and keep the record with the oldest timestamp.
I tried to create a timestamp column order with this and then drop Duplicates (Drop Duplicates will keep the first record and delete the next ones) so this works.
from pyspark.sql.functions import unix_timestamp
...
pattern = "yyyy-MM-dd hh:mm:ss"
#Valid from is the timestamp column
#Extract time from the field and order ascending
df= df.withColumn("timestampCol", unix_timestamp(df["valid_from"], pattern).\
cast("timestamp")).\
orderBy(["timestampCol"],ascending = True)
#Drop duplicates based on all column except timestamps so only the older
#timestamps stay
df = df.dropDuplicates(subset= [x for x in df.columns if x not in ["valid_from", "timestampCol"]])
This code works fine for small datasets. But when I try to use a bigger dataset I have severe performance issues. I found out that dropDuplicates() after orderBy() has atrocious performance. I tried to cache the dataframe but no much progress was made.
The problem is when drop duplicates starts I take at the console
[Stage x:=============================> (1 + 1) / 2]
And it stacks there for almost 20 minutes.
So my question is this:
Why dropDuplicates() after orderBy() has so bad performance? Is there another way to achieve the same goal (Drop Duplicates on multiple columns while keeping the older value?
Does the console output means that only 2 executors are running at that time? And if so how can I increase them? I submit my application in YARN with: --num-executors 5 --executor-cores 5 --executor-memory 20G . Why on this particular point I have only two executors running and how can I increase them for this step?