I am new to Spark and have a setup where I want to read in two streams of data, each from Kafka topics, using Spark structured streaming 2.4. I then want to join these two streams, likely with a very large window of time.
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", endpoint)
.option("subscribe", topic1)
.option("startingOffsets", <?>)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", endpoint)
.option("subscribe", topic2)
.option("startingOffsets", <?>)
.load()
val joineddf = df1.join(
df2,
expr(
raw"""
key == key2 AND
timestamp1 <= timestamp2 + interval 1 day
"""),
joinType = "leftOuter")
Now, when I restart/upgrade the application, I want to make sure that a). My startingOffsets are early enough such that I don't skip joining any events and b). I minimize the amount of events I have to re-join (although I may have to sacrifice b for a). I'm wondering, what is the best way of checkpointing in this scenario? I know that I can add checkpointing to the join operation like so:
val joineddf = df1.join(
df2,
expr(
raw"""
key == key2 AND
timestamp1 <= timestamp2 + interval 1 day
"""),
joinType = "leftOuter")
.checkpoint("/directory")
But that won't help me when providing the Kafka offsets at startup. I have also read through these two previous questions [1] [2], but they seem to only deal with checkpointing on a query (i.e. writestream) whereas my concern is checkpointing the reading of two different streams. I do not want to pass "latest" as the starting offset, as I imagine data that was already read on the previous application run but was in "limbo" waiting to be joined will be skipped.