0

I have a dataframe like so:

|SEQ_ID |TIME_STAMP             |RESULT          |
+-------+-----------------------+----------------+
|3879859|2021-08-31 19:54:53.88 |25.9485244750994|
|3879859|2021-08-31 21:16:06.228|35.9163284302007|
|3879859|2021-08-31 22:28:46.306|41.9778823852006|
|3879859|2021-08-31 22:28:46.306|41.9778823852006|
|3879859|2021-08-31 23:12:08.058|39.9112701415998|
|3879859|2021-08-31 23:17:35.796|33.0476760864009|
|3879859|2021-08-31 23:47:17.383|60.2846145630007|
|3879859|2021-09-01 00:00:26.722|67.0690536499006|
|3879859|2021-09-01 00:00:26.722|67.0690536499006|
|3879859|2021-09-01 00:02:07.825|67.8424835205007|

For normal percentile calculation in pyspark I use the following:

df.groupBy('SEQ_ID')\
.agg(f.expr('percentile(RESULT, 0.25)').alias('Q1'),
     f.expr('percentile(RESULT, 0.50)').alias('Median'),
     f.expr('percentile(RESULT, 0.75)').alias('Q3'))\

But that takes all the data grouped by SEQ_ID. I want to calculate the Q1, median, and Q3 for each row, using the rows above it.

|SEQ_ID |TIME_STAMP             |RESULT          |Q1   |Median|Q3   |
+-------+-----------------------+----------------+-----+------+-----+
|3879859|2021-08-31 19:54:53.88 |25.9485244750994|
|3879859|2021-08-31 21:16:06.228|35.9163284302007|
|3879859|2021-08-31 22:28:46.306|41.9778823852006|
|3879859|2021-08-31 22:28:46.306|41.9778823852006|
|3879859|2021-08-31 23:12:08.058|39.9112701415998|
|3879859|2021-08-31 23:17:35.796|33.0476760864009|
|3879859|2021-08-31 23:47:17.383|60.2846145630007|
|3879859|2021-09-01 00:00:26.722|67.0690536499006|
|3879859|2021-09-01 00:00:26.722|67.0690536499006|
|3879859|2021-09-01 00:02:07.825|67.8424835205007|

So the Q1, Median and Q3 for the first row would be: 25.9485244750994 For the 2nd row, the percentiles would be calculated with 25.9485244750994 and 35.9163284302007 and so on and so forth.

If I define a window such that

w=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc()).rangeBetween(Window.unboundedPreceding,0)

Would the following code work?:

df.groupBy('SEQ_ID')\
.agg(f.expr('percentile(Pad_Wear, 0.25)').alias('Q1'),
     f.expr('percentile(Pad_Wear, 0.50)').alias('Median'),
     f.expr('percentile(Pad_Wear, 0.75)').alias('Q3')).over(w)
thentangler
  • 1,048
  • 2
  • 12
  • 38
  • @ggordon Thank you. yes, it does. It's pretty much what I expected. However, I underestimated the cumulative effect and I changed the `percentile` to `percentile_approx` since I have a very large dataset. – thentangler Oct 20 '21 at 16:44

1 Answers1

0

You can create a new "fake ID column" using monotonically_increasing_id, then either group by it or windowing by it, it will run row by row

pltc
  • 5,836
  • 1
  • 13
  • 31
  • Thank you for the suggestion. I thought about using that, but it would load the entire dataset to the driver node and create a lot of shuffle writes. I tried using `row_number()` but it was still slow. The `window` function seemed to be faster. Although it was still 50 mins longer than if I had just used a `groupBy` – thentangler Oct 20 '21 at 16:46