I am using Spark Structured Streaming to read from a Kafka topic (say topic1) and using SINK to write to another topic (topic1-result). I can see the messages are not being removed from Topic1 after writing to another topic using Sink.
// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("subscribe", "topic1")
.load()
//SINK to another topic
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("checkpointLocation", "/tmp/checkpoint1")
.option("topic", "topic1-result")
.start()
the documentation says we can not use auto-commit for structured streams
enable.auto.commit: Kafka source doesn’t commit any offset.
but how to acknowledge messages and remove the processed messages from the topic (topic1)