The situation is the following: I have a time-series Dataframe consisting of one index column which orders the sequence; and a column of some discrete value like this:
id value
0 A
1 A
2 B
3 C
4 A
5 A
6 A
7 B
I now want to reduce all consecutive duplicates, so that it looks like this:
id value
0 A
2 B
3 C
4 A
7 B
I've come up with a window and using lag()
, when()
and filtering afterwards. The problem is that window requires a specific partition column. What I want however is to just drop the consecutive rows in each partition first and after that check for the partition borders (since the window works per partition, so consecutive rows over partition borders still exist).
df_with_block = df.withColumn(
"block", (col("id") / df.rdd.getNumPartitions()).cast("int"))
window = Window.partitionBy("block").orderBy("id")
get_last = when(lag("value", 1).over(window) == col("value"), False).otherwise(True)
reduced_df = unificated_with_block.withColumn("reduced",get_last)
.where(col("reduced")).drop("reduced")
In the first line, I created a new dataframe with uniformly distributed partitions by integer dividing the id's. get_last then contains boolean values depending on the current rows being equal to the preceding. reduced_df then filters the duplicates out.
The problem are now the partition borders:
id value
0 A
2 B
3 C
4 A
6 A
7 B
As you can see, row with id=6 didn't get removed since it was processed in a different partition. I'm thinking about different ideas to solve this:
- Use
coalesce()
to combine partitions and filter again? - Find some way to access first value from the next partition
- Use an RDD instead of a Dataframe to do all of this
- Change my partitioning function so that it doesn't cut in places where duplicates are located (How?)
I'm curious how that could work out.