11

I was going through the Spark structured streaming - Kafka integration guide here.

It is told at this link that

enable.auto.commit: Kafka source doesn’t commit any offset.

So how do I manually commit offsets once my spark application has successfully processed each record?

Michael Heil
  • 16,250
  • 3
  • 42
  • 77
user3243499
  • 2,953
  • 6
  • 33
  • 75

1 Answers1

12

tl;dr

It is not possible to commit any messages to Kafka. Starting with Spark version 3.x you can define the name of the Kafka consumer group, however, this still does not allow you to commit any messages.


Since Spark 3.0.0

According to the Structured Kafka Integration Guide you can provide the ConsumerGroup as an option kafka.group.id:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("kafka.group.id", "myConsumerGroup")
  .load()

However, Spark still will not commit any offsets back so you will not be able to "manually" commit offsets to Kafka. This feature is meant to deal with Kafka's latest feature Authorization using Role-Based Access Control for which your ConsumerGroup usually needs to follow naming conventions.

A full example of a Spark 3.x application is discussed and solved here.

Until Spark 2.4.x

The Spark Structured Streaming + Kafka integration Guide clearly states how it manages Kafka offsets. Spark will not commit any messages back to Kafka as it is relying on internal offset management for fault-tolerance.

The most important Kafka configurations for managing offsets are:

  • group.id: Kafka source will create a unique group id for each query automatically. According to the code the group.id will be set to
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
  • auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it.
  • enable.auto.commit: Kafka source doesn’t commit any offset.

Therefore, in Structured Streaming it is currently not possible to define your custom group.id for Kafka Consumer and Structured Streaming is managing the offsets internally and not committing back to Kafka (also not automatically).

2.4.x in Action

Let's say you have a simple Spark Structured Streaming application that reads and writes to Kafka, like this:

// create SparkSession
val spark = SparkSession.builder()
  .appName("ListenerTester")
  .master("local[*]")
  .getOrCreate()

// read from Kafka topic
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "testingKafkaProducer")
  .option("failOnDataLoss", "false")
  .load()

// write to Kafka topic and set checkpoint directory for this stream
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "testingKafkaProducerOut")
  .option("checkpointLocation", "/home/.../sparkCheckpoint/")
  .start()

Offset Management by Spark

Once this application is submitted and data is being processed, the corresponding offset can be found in the checkpoint directory:

myCheckpointDir/offsets/

{"testingKafkaProducer":{"0":1}}

Here the entry in the checkpoint file confirms that the next offset of partition 0 to be consumed is 1. It implies that the application already processes offset 0 from partition 0 of the topic named testingKafkaProducer.

More on the fault-tolerance-semantics are given in the Spark Documentation.

Offset Management by Kafka

However, as stated in the documentation, the offset is not committed back to Kafka. This can be checked by executing the kafka-consumer-groups.sh of the Kafka installation.

./kafka/current/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group "spark-kafka-source-92ea6f85-[...]-driver-0"

TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID      HOST         CLIENT-ID
testingKafkaProducer 0          -               1               -    consumer-1-[...] /127.0.0.1   consumer-1

The current offset for this application is unknown to Kafka as it has never been committed.

Possible Workaround

Please carefully read the comments below from Spark committer @JungtaekLim about the workaround: "Spark's fault tolerance guarantee is based on the fact Spark has a full control of offset management, and they're voiding the guarantee if they're trying to modify it. (e.g. If they change to commit offset to Kafka, then there's no batch information and if Spark needs to move back to the specific batch "behind" guarantee is no longer valid.)"

What I have seen doing some research on the web is that you could commit offsets in the callback function of the onQueryProgress method in a customized StreamingQueryListener of Spark. That way, you could have a consumer group that keeps track of the current progress. However, its progress is not necessarily aligned with the actual consumer group.

Here are some links you may find helpful:

Michael Heil
  • 16,250
  • 3
  • 42
  • 77
  • Thanks for referring my PR and repository, but I have to correct one thing - there's NO way to commit offset manually. That's by intention to give Spark full control of offset management, instead of relying on Kafka. My PR & repository is to commit offset on "different" group ID, so that end users are able to leverage such information to integrate with Kafka ecosystem UI/admin tool. – Jungtaek Lim Jan 04 '21 at 07:49
  • Hi @JungtaekLim, thank you for making this clear. I re-read my answer and your statement already seems to be reflected with my sentence "Structured Streaming is managing the offsets internally and not committing back to Kafka (also not automatically)." Let me know, if you still think this is confusing or mis-leading. – Michael Heil Jan 04 '21 at 07:55
  • 1
    I'm sorry, but I have to say again there's NO possible approach and end users shouldn't try to do it. The answer should be simply NO. Spark's fault tolerance guarantee is based on the fact Spark has a full control of offset management, and they're voiding the guarantee if they're trying to modify it. (e.g. If they change to commit offset to Kafka, then there's no batch information and if Spark needs to move back to the specific batch "behind" guarantee is no longer valid.) – Jungtaek Lim Jan 04 '21 at 08:10
  • Thank you again @JungtaekLim for taking the time to read an "old" answer and provide some very helpful insights. I fully agree with your arguments and tried to point it out in my answer. – Michael Heil Jan 04 '21 at 08:41
  • 2
    My pleasure. Actually I somehow visited this because someone got mislead by this answer and consider my project as a solution on actual problem (regarding offset issue on Kafka data source), whereas it isn't and it can't be. – Jungtaek Lim Jan 04 '21 at 22:54
  • I'm not sure I follow how Confluent RBAC documentation affects whether Spark manages offsets or Kafka does – OneCricketeer May 10 '22 at 14:58