3

I'm fairly new to Spark and SQL. I am trying to add a column to my df (which I will then save to a Delta table) that gives a unique id to each record/row and increments it every that specific record is updated.

I was trying to do the following:

SELECT etc,
CONCAT(somerows1) as id1,
ROW_NUMBER() OVER(PARTITION BY somerows1 ORDER BY (SELECT NULL)) AS versionid
FROM etc

somerows1 being the concatenation of several columns in order to form a unique record. I have no particular interest in the records being ordered in a particular form, that's why I chose ORDER BY (SELECT NULL).

I get the following error:

Error in SQL statement: AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;

Does anyone have any idea on how to fix this?

Thanks

J.Doe
  • 529
  • 4
  • 14
  • As [this](https://stackoverflow.com/questions/53294809/spark-non-time-based-windows-are-not-supported-on-streaming-dataframes-dataset) suggests, maybe the problem is that you need to specify a time-based column in the partition. – Let's try Aug 19 '20 at 15:36
  • @Let'stry I have tried adding a timestamp column to the partition and I still get the same error – J.Doe Aug 19 '20 at 15:42

2 Answers2

6

I have solved this problem by using the foreachBatch sink on the .writeStream. This allows you to create a functon where the streaming dataframe is treated like a static/batch dataframe (the function is applied to each micro-batch).

In Scala the code would look something like this:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{row_number, lit}

val saveWithWindowFunction = (sourceDf: DataFrame, batchId: Long) => {

  val windowSpec = Window
    .partitionBy("somerows1") 
    .orderBy(lit(null))
  
  sourceDf
    .withColumn("versionid", row_number().over(windowSpec))

//... save the dataframe using: sourceDf.write.save()
}

With the .writeStream calling your function:

  .writeStream
  .format("delta")
  .foreachBatch(saveWithWindowFunction)
  .start()
Vhon Newmahn
  • 111
  • 2
  • 5
0

What you're looking for is aggregations over a sliding event-time window. Check the documentation and examples here.

jayrythium
  • 679
  • 4
  • 11