0

I am prototyping calculating aggregations in a Spark Structured Streaming (Spark 3.0) job and publishing the updates to Kafka. I need to calculate the max date and max percentage all time (no windowing) for each group. The code seems fine except for with Kafka tombstone records (deletes) in the source stream. The stream receives a Kafka record with a valid key and a null value, but the max aggregate continues to include the record in the calculation. What are the best options to have this recalculate without the deleted records when a delete is consumed from Kafka?

Example
Message produced:

<"user1|1", {"user": "user1", "pct":30, "timestamp":"2021-01-01 01:00:00"}>  
<"user1|2", {"user": "user1", "pct":40, "timestamp":"2021-01-01 02:00:00"}>  
<"user1|2", null>

Spark code snippet:

val usageStreamRaw = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", usageTopic).load()

val usageStream = usageStreamRaw
    .select(col("key").cast(StringType).as("key"),
            from_json(col("value").cast(StringType), valueSchema).as("json"))
    .selectExpr("key", "json.*")

val usageAgg = usageStream.groupBy("user")
      .agg(
        max("timestamp").as("maxTime"),
        max("pct").as("maxPct")
      )

val sq = usageAgg.writeStream.outputMode("update").option("truncate","false").format("console").start()

sq.awaitTermination()

For user1 the result in column pct is 40 but it should be 30 after deletion. Is there a good way to do this with Spark Structured Streaming?

Dustin V
  • 176
  • 2
  • 8

1 Answers1

0

You could make use of the Kafka timestamp in each message through

val usageStream = usageStreamRaw
    .select(col("key").cast(StringType).as("key"),
            from_json(col("value").cast(StringType), valueSchema).as("json"),
            col("timestamp"))
    .selectExpr("key", "json.*", "timestamp")

Then

  • select only the latest value for each key, and
  • filter out null values

before applying your aggregation on the maximum time and pct.

Michael Heil
  • 16,250
  • 3
  • 42
  • 77
  • could you explain how "to select only the latest value for each key" ? – MaatDeamon Jul 30 '23 at 01:11
  • @MaatDeamon using max function should work on timestamp/long columns – OneCricketeer Jul 30 '23 at 11:49
  • I still fail to see how that does the trick. Can you provide a complete example ? – MaatDeamon Jul 30 '23 at 14:40
  • It looks to me like something that was suggest for flink back when it did not have full support for dynamic table https://stackoverflow.com/questions/48554999/apache-flink-how-to-enable-upsert-mode-for-dynamic-tables . Still the concept of dynamic table was already there. Here i just fail to see how the delete would work. Upon research it seems the only way to do it with spark, is via delta table from databricks – MaatDeamon Jul 30 '23 at 14:56