0

I have input dataframe which has 3 columns Time, Name , Flag. I would like to aggregate into a start and end columns where the Name and Flag have the same value.

Input data frame

Time Name Flag
5/1/2023 1:01 Peter 1
5/1/2023 1:02 Peter 1
5/1/2023 1:03 Peter 1
5/1/2023 1:04 Peter 0
5/1/2023 1:05 Peter 0
5/1/2023 1:06 Peter 1
5/1/2023 1:07 Peter 1
5/1/2023 1:08 Peter 1
5/1/2023 1:01 John 1
5/1/2023 1:02 John 0
5/1/2023 1:03 John 0
5/1/2023 1:04 John 0
5/1/2023 1:05 John 0
5/1/2023 1:06 John 0
5/1/2023 1:07 John 1
5/1/2023 1:08 John 1
5/2/2023 1:10 Peter 1
5/2/2023 1:11 Peter 1
5/2/2023 1:20 John 0
5/2/2023 1:21 John 0
5/2/2023 1:22 John 0

Output data frame

Start End Name Flag
5/1/2023 1:01 5/1/2023 1:03 Peter 1
5/1/2023 1:04 5/1/2023 1:05 Peter 0
5/1/2023 1:06 5/1/2023 1:08 Peter 1
5/2/2023 1:10 5/2/2023 1:11 Peter 1
5/1/2023 1:01 5/1/2023 1:01 John 1
5/1/2023 1:02 5/1/2023 1:06 John 0
5/1/2023 1:07 5/1/2023 1:08 John 1
5/2/2023 1:20 5/2/2023 1:22 John 0

In this case, consecutive rows means consecutive in time.

1:08 and 1:10 is not combined because there is a gap (missing 1:09) between the rows 1:08 and 1:10

Can you please tell me how can I do that?

n179911a
  • 125
  • 1
  • 8
  • Note that Spark dataframe is unordered dataframe. This description "where the Name and Flag have the same value." is not enough to derive what you want. what do you consider as "consecutive"? Especially, what is the logic that Peter 1:08 and Peter 1:10 is not combined? – Emma May 08 '23 at 19:32
  • What have you tried? Have you checked [this](https://stackoverflow.com/a/74781106/10452700)? Possible duplication of [grouping consecutive rows in PySpark Dataframe](https://stackoverflow.com/q/51309693/10452700). Maybe `df.groupby(["Name", "start_date", "end_date"]).filter(....)` – Mario May 08 '23 at 19:37
  • 1:08 and 1:10 is not combined because there is a gap (missing 1:09) between the rows. – n179911a May 08 '23 at 19:39
  • @n179911a One solution is to refill the gaps to keep the data sequenced: check [this post](https://stackoverflow.com/q/69900563/10452700) and fill in missing info with something or NaN – Mario May 08 '23 at 19:49
  • @n179911a that is good. could you add the clarification in your post? and check the links and see if you can replicate on your data. – Emma May 08 '23 at 20:00

1 Answers1

0

First, you want to create groupings that meet your condition. To create it, a general tip is to create a flag that has 1 on when you want to separate a group and 0 when you want to combine to previous. Then, cumsum over this flag will result in the groupings that you want.

Your conditions are

from pyspark.sql import functions as F
# covert Time to timestamp
df = df.withColumn('timestamp', F.to_timestamp('Time', 'M/d/yyyy H:mm'))

w = Window.partitionBy('Name').orderBy('timestamp')

# If previous Flag is different from current Flag
F.lag('Flag').over(w) != F.col('Flag'))

# OR previous timestamp is more than 1 minute ago
| (((F.col('timestamp').cast('long') - F.lag('timestamp').over(w).cast('long')) / 60) > 1)

With these conditions, create the groupings as grp column and use the column to aggregate.

w = Window.partitionBy('Name').orderBy('timestamp')
df = (df.withColumn('timestamp', F.to_timestamp('Time', 'M/d/yyyy H:mm'))
      .withColumn('grp', (F.lag('Flag').over(w).isNull() 
                          | (F.lag('Flag').over(w) != F.col('Flag'))
                          | (((F.col('timestamp').cast('long') - F.lag('timestamp').over(w).cast('long')) / 60) > 1)).cast('int'))
      .withColumn('grp', F.sum('grp').over(w))
      .groupby('Name', 'grp')
      .agg(F.min('Time').alias('Start'), F.max('Time').alias('End'), F.first('Flag').alias('Flag')))
Emma
  • 8,518
  • 1
  • 18
  • 35
  • Thanks @Emma for your detailed answer. I am going to try what you suggested. – n179911a May 10 '23 at 21:22
  • I get an error in this condition ` | (((F.col('timestamp') - F.lag('timestamp').over(w)).cast('long') /60) > 1).cast('int')` . This is the error I get `cannot resolve 'CAST((timestamp - lag(timestamp, 1, NULL) OVER (PARTITION BY NAME ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)) AS BIGINT)' due to data type mismatch: cannot cast interval day to second to bigint;` – n179911a May 12 '23 at 05:04
  • I updated my answer. I added casting both `timestamp`s to `long` before the subtraction. – Emma May 12 '23 at 13:54
  • Thanks @Emma. I have 1 more question. Why we need `F.lag('Flag').over(w).isNull() ` as part of the 'grp` condition.? – n179911a May 13 '23 at 05:48
  • Without the condition, the first row per Name will get null and cannot merge correctly to the second row. try it without the condition and see the Peter's data. – Emma May 15 '23 at 14:45