9

Merging streaming with static datasets is a great feature of structured streaming. But on every batch the datasets will be refreshed from the datasources. Since these sources are not always that dynamic it would be a performance gain to cache a static dataset for a specified period of time (or number of batches). After the specified period/number of batches the dataset is reloaded from the source otherwise retrieved from cache.

In Spark streaming I managed this with a cached dataset and unpersist it after a specified number of batch runs, but for some reason this is not working anymore with structured streaming.

Any suggestions to do this with structured streaming?

zero323
  • 322,348
  • 103
  • 959
  • 935
Chris
  • 523
  • 4
  • 11
  • 1
    Could you show us your code with Structured Streaming? I have in mind `mapGroupsWithState` to handle the custom logic of refreshing the static dataset – Paul Leclercq Dec 13 '17 at 16:57
  • What do you have in mind with mapGroupsWithState? Im wondering if this is an efficient way since mapGroupsWithState operates at the record level (not the dataframe/dataset as a whole). MapGroupsWithState could run multiple times during a batch. With Spark Streaming I managed this by using a batch counter and refresh (cache/persist) dataset if the counter hits a threshold. But with structured streaming the batch counter isn't working anymore (I guess the incrementer runs once, and not on every batch) – Chris Dec 15 '17 at 08:26
  • 1
    You're right about mapGroupsWithState. I would create a custom sink to be able to refresh my dataset inside the `addBatch` method for every X batch (based on the batchId). Hope this example can help : https://github.com/polomarcus/Spark-Structured-Streaming-Examples/blob/master/src/main/scala/cassandra/StreamSinkProvider/CassandraSink.scala#L39 – Paul Leclercq Dec 15 '17 at 15:28
  • Interesting example! But is a custom Sink the only way to do this? A sink is an termination point and refreshing a static dataset there also requires to do data processing in a sink (like joining, filtering, etc with static dataset). Maybe you want to add state with mapsGroupsWithState after joining the static dataset. I guess a sink is not the best place if you want to do data processing as well. – Chris Dec 18 '17 at 08:36
  • Does this answer your question? [Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically](https://stackoverflow.com/questions/66154867/stream-static-join-how-to-refresh-unpersist-persist-static-dataframe-periodic) – Michael Heil Mar 07 '21 at 20:33

1 Answers1

0

I have a developed a solution for another question Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically which might also be helpful to solve your problem:

You could do this by making use of the streaming scheduling capabilities that Structured Streaming provides.

You can trigger the refreshing (unpersist -> load -> persist) of a static Dataframe by creating an artificial "Rate" streams that refreshes the static dataset periodically. The idea is to:

  1. Load the staticDataframe initially and keep as var
  2. Define a method that refreshes the static Dataframe
  3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
  4. Read actual streaming data and perform join operation with static Dataframe
  5. Within that Rate Stream have a foreachBatch sink that calls refresher method

The following code runs fine with Spark 3.0.1, Scala 2.12.10 and Delta 0.7.0.

  // 1. Load the staticDataframe initially and keep as `var`
  var staticDf = spark.read.format("delta").load(deltaPath)
  staticDf.persist()

  //  2. Define a method that refreshes the static Dataframe
  def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
    staticDf.unpersist()
    staticDf = spark.read.format("delta").load(deltaPath)
    staticDf.persist()
    println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
  }

  // 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
  val staticRefreshStream = spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .option("numPartitions", 1)
    .load()
    .selectExpr("CAST(value as LONG) as trigger")
    .as[Long]

  // 4. Read actual streaming data and perform join operation with static Dataframe
  // As an example I used Kafka as a streaming source
  val streamingDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as id", "offset as streamingField")

  val joinDf = streamingDf.join(staticDf, "id")

  val query = joinDf.writeStream
    .format("console")
    .option("truncate", false)
    .option("checkpointLocation", "/path/to/sparkCheckpoint")
    .start()

  // 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
  staticRefreshStream.writeStream
    .outputMode("append")
    .foreachBatch(foreachBatchMethod[Long] _)
    .queryName("RefreshStream")
    .trigger(Trigger.ProcessingTime("5 seconds"))
    .start()

To have a full example, the delta table got created as below:

  val deltaPath = "file:///tmp/delta/table"

  import spark.implicits._
  val df = Seq(
    (1L, "static1"),
    (2L, "static2")
  ).toDF("id", "deltaField")

  df.write
    .mode(SaveMode.Overwrite)
    .format("delta")
    .save(deltaPath)
Michael Heil
  • 16,250
  • 3
  • 42
  • 77