1

I am new to Spark and have a setup where I want to read in two streams of data, each from Kafka topics, using Spark structured streaming 2.4. I then want to join these two streams, likely with a very large window of time.

val df1 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", endpoint)
  .option("subscribe", topic1)
  .option("startingOffsets", <?>) 
  .load()

val df2 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", endpoint)
  .option("subscribe", topic2)
  .option("startingOffsets", <?>) 
  .load()

val joineddf = df1.join(
  df2,
  expr(
    raw"""
            key == key2 AND
            timestamp1 <= timestamp2 + interval 1 day
        """),
  joinType = "leftOuter")

Now, when I restart/upgrade the application, I want to make sure that a). My startingOffsets are early enough such that I don't skip joining any events and b). I minimize the amount of events I have to re-join (although I may have to sacrifice b for a). I'm wondering, what is the best way of checkpointing in this scenario? I know that I can add checkpointing to the join operation like so:

   val joineddf = df1.join(
  df2,
  expr(
    raw"""
            key == key2 AND
            timestamp1 <= timestamp2 + interval 1 day
        """),
  joinType = "leftOuter")
  .checkpoint("/directory")

But that won't help me when providing the Kafka offsets at startup. I have also read through these two previous questions [1] [2], but they seem to only deal with checkpointing on a query (i.e. writestream) whereas my concern is checkpointing the reading of two different streams. I do not want to pass "latest" as the starting offset, as I imagine data that was already read on the previous application run but was in "limbo" waiting to be joined will be skipped.

limotl3
  • 13
  • 2

1 Answers1

0

You need to use checkpointing on the writeStream - it will track offsets for all sources that are used for your operations, and store them in the checkpoint directory, so when you restart application, it will read the offsets for all sources and continue from them. The offset that you specify in readStream is just for case when you don't have checkpoint directory yet - after the first query it will be filled with real offsets, and the value specified in options won't be used (until you remove checkpoint directory).

Read the Structured Streaming documentation to understand how it works.

P.S. The checkpoint that you're using in the last example, is another thing - not for Structured Streaming.

Alex Ott
  • 80,552
  • 8
  • 87
  • 132