0

I want to be able to consecutively go through a table using the value calculated in the previous row in the current row. It seems a window function could do this.

from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F

# Create a sample DataFrame
data = [
    (1, 1, 3),
    (2, 2, 4),
    (3, 3, 5),
    (4, 4, 6),
    (5, 8, 10)
]

columns = ["id", "start", "stop"]

spark = SparkSession.builder.master("local").appName("SelfJoin").getOrCreate()

df = spark.createDataFrame(data, columns).sort('start')

# 1. Sort the DataFrame by 'start'
df = df.sort("start").withColumn('prev_start', col('start'))

# 2. Initialize a window that looks back one record
window = Window.orderBy(['start']).rowsBetween(-1, -1)

df = (
    df
    .withColumn("prev_start", F.lag("prev_start", 1).over(window))
)

df.toPandas().style.hide_index()

At row one

id start stop prev_start
1 1 3 NaN

At row two

Extra: I need to specify that if value in previous row is missing, then use the value in the current row: 1.0

id start stop prev_start
1 1 3 NaN
2 2 4 1.0

At Row 3:

id start stop prev_start
1 1 3 NaN
2 2 4 1.0
3 3 5 2.0

Look back, prev_start assigned to previous row value of 1

id start stop prev_start
1 1 3 NaN
2 2 4 1.0
3 3 5 1.0

At Row 4

id start stop prev_start
1 1 3 NaN
2 2 4 1.0
3 3 5 1.0
4 4 6 3.0

Assign value of prev_start to what is now 1, not what it was originally: 2

id start stop prev_start
1 1 3 NaN
2 2 4 1.0
3 3 5 1.0
4 4 6 1.0

so now, the actually application, I also do this for prev_end = max(prev_end, end)

  • as soon as this prev_end is less than the current row start (and the data is ordered correctly) I now have a non-contiguous time frame (start to stop), that will be from prev_start to prev_end

Result

id start stop prev_start
1 1 3 NaN
2 2 4 1.0
3 3 5 2.0
4 4 6 3.0
5 8 10 4.0

But I want

Result

id start stop prev_start
1 1 3 NaN
2 2 4 1.0
3 3 5 1.0
4 4 6 1.0
5 8 10 1.0

Because in this case, the values are calculated row by row, so once the value is set to 1, all prev_start values would consecutively be made 1

Harlan Nelson
  • 1,394
  • 1
  • 10
  • 22
  • "the values are calculated row by row" what calculation? please elaborate how you are deriving 1.0 on the 2nd row or 3rd row and so on. – Emma Apr 07 '23 at 18:54
  • I tried to make this example as simple as possible. In this case, once the value is assigned 1, that value should get propagated instead of just the first lag in the original data. – Harlan Nelson Apr 07 '23 at 19:02
  • 1
    this is still not clear to me yet. how the value is assigned 1 on the 2nd row? Please explain why first row is NaN and 2nd row is 1 "the first assignment" (why not first row). If you want to simplify the calculation, you can have a fake equation something like `(stop - start of when id = 1) = 2`. 3rd row and onward same result (=2) or 3rd row, do you want to make a calculation using the result from the previous result like `2 + (stop - start of when id = 2)`? – Emma Apr 07 '23 at 19:18
  • It turns out https://stackoverflow.com/questions/75915126/replicating-a-sas-retain-statement-in-pyspark gives an answer to this type of question, although the answer is very complex. – Harlan Nelson Apr 07 '23 at 20:43

0 Answers0