2

The PySpark SQL functions reference on the row_number() function says

returns a sequential number starting at 1 within a window partition

implying that the function works only on windows. Trying

df.select('*', row_number())

predictably gives a

Window function row_number() requires an OVER clause

exception.

Now, .over() seems to work only with WindowSpec because

from pyspark.sql.functions import window, row_number
...
df.select('*', row_number().over(window('time', '5 minutes')))

gives a

TypeError: window should be WindowSpec

exception. However, according to this comment on the ASF Jira:

By time-window we described what time windows are supported in SS natively. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#types-of-time-windows

Window spec is not supported. This defines the boundary of window as non-timed manner, the offset(s) of the row, which is hard to track in streaming context.

WindowSpec is generally not supported in Structured Streaming. Leading to the conclusion that the row_number() function is not supported in Structured Streaming. Is that correct? Just want to make sure I'm not missing anything here.

Kai Roesner
  • 429
  • 3
  • 17
  • 1
    Not an answer attempt, but I think there is a bit of confusion between [`winodw()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.window.html#pyspark.sql.functions.window) SQL function and [`Window`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/window.html) class. – mazaneicha Jan 23 '23 at 19:32
  • Not confusion, but intention: I was trying to use the `window()` SQL function instead of the `Window` class because the latter returns a `WindowSpec` which is not supported in Structured Streaming (cf. https://stackoverflow.com/questions/53294809 and https://stackoverflow.com/questions/63490147). – Kai Roesner Jan 24 '23 at 08:56

1 Answers1

1

first point, your imports are wrong:

from pyspark.sql import Window
from pyspark.sql.functions import row_number

second, try doing like this:

partition_columns = Window.partitionBy(
    df.column1,
    df.column2,
    ...
).orderBy(df.col...)

df = df.withColumn('your_new_column_rank', row_number().over(partition_columns))

Usually we use Windowing functions to deduplicate records in Structured Streaming, the documentation says that is not possible to use it because this function will not access the already saved data as we can do with Batch, but you can set watermark, like this:

df = df.withWatermark("timestamp", "10 minutes").withColumn('your_new_column_rank', row_number().over(partition_columns))

or even you can try using watermark to run drop_duplicate function.

Another way to do it, is through the foreachBatch

def func(batch_df, batch_id):
    partition_columns = Window.partitionBy(
      df.column1,
      df.column2,
      ...
    ).orderBy(batch_df.col...)

    batch_df= batch_df.withColumn('your_new_column_rank', 
    row_number().over(partition_columns))
    ...

writer = sdf.writeStream.foreachBatch(func)

Like above you will have a micro df that is not a Structured Streaming DF, so there is possible to access functions that you can't with a a streaming one.

Vidar
  • 61
  • 3
  • I was trying to use the `window()` SQL function instead of the `Window` class because `Window.partitionBy()` returns a `WindowSpec` which is not supported in Structured Streaming (cf. https://stackoverflow.com/questions/53294809 and https://stackoverflow.com/questions/63490147). Also, I don't want the `row_number()` over partitions but the whole data set. Using the `foreach` or `foreachBatch` sink would work, of course, but then I cannot leverage checkpointing to make my job restartable after a crash. – Kai Roesner Jan 24 '23 at 08:52
  • PS: Because in the micro batch handler I would have to keep the row count as an internal state which will not be included in Spark's checkpointing. – Kai Roesner Jan 24 '23 at 09:32
  • Instead of keeping as an internal state you can retrieve your current saved data, apply the function and the just save or merge it again, but yes, it's one of the challenges when working on with structured streaming – Vidar Jan 25 '23 at 12:41