0

I am using Spark Structured Streaming to read from a Kafka topic (say topic1) and using SINK to write to another topic (topic1-result). I can see the messages are not being removed from Topic1 after writing to another topic using Sink.

// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1")
  .option("subscribe", "topic1")
  .load()

//SINK to another topic 
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1")
  .option("checkpointLocation", "/tmp/checkpoint1")
  .option("topic", "topic1-result")
  .start()

the documentation says we can not use auto-commit for structured streams

enable.auto.commit: Kafka source doesn’t commit any offset.

but how to acknowledge messages and remove the processed messages from the topic (topic1)

blackbishop
  • 30,945
  • 11
  • 55
  • 76
vkt
  • 1,401
  • 2
  • 20
  • 46

1 Answers1

2

Two considerations:

  1. Messages are not removed from Kafka once you have committed. When your consumer executes commit, Kafka increases the offset of this topic respect to the consumer-group that has been created. But messages remain in the topic depending on the retention time that you configure for the topic.

  2. Indeed, Kafka source doesn´t make the commit, the stream storages the offset that points to the next message in the streaming´s checkpoint dir. So when you stream restarts it takes the last offset to consume from it.

Emiliano Martinez
  • 4,073
  • 2
  • 9
  • 19
  • @vkt You would have to convert down to RDD and handle it manually. But checkpoints work fine, for the most part (assuming you use reliable, disturbed storage) https://stackoverflow.com/a/46174353/2308683 – OneCricketeer Feb 12 '20 at 15:02