I have a long running spark structured streaming job which is ingesting kafka data. I have one concern as below. If the job is failed due to some reason and restart later, how to ensure kafka data will be ingested from the breaking point instead of always ingesting current and later data when the job is restarting. Do I need to specifiy explicitly something like consumer group and auto.offet.reset, etc? Are they supported in spark kafka ingestion? Thanks!

- 59,682
- 7
- 117
- 137

- 295
- 4
- 22
-
Thanks, I am talking about consuming. Just want to set the consumer group id to ensure the offset is kept if the spark job is failed. My spark is 2.4.6. kafka lib is 0.10. When I set group.id, I get the following error "Exception in thread "main" java.lang.IllegalArgumentException: Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets." – yyuankm Sep 05 '20 at 09:34
1 Answers
According to the Spark Structured Integration Guide, Spark itself is keeping track of the offsets and there are no offsets committed back to Kafka. That means if your Spark Streaming job fails and you restart it all necessary information on the offsets is stored in Spark's checkpointing files. That way your application will know where it left off and continue to process the remaining data.
I have written more details about setting group.id
and Spark's checkpointing of offsets in another post
Here are the most important Kafka specific configurations for your Spark Structured Streaming jobs:
group.id: Kafka source will create a unique group id for each query automatically. According to the code the
group.id
will automatically be set to
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}
auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it
enable.auto.commit: Kafka source doesn’t commit any offset.
Therefore, in Structured Streaming it is currently not possible to define your custom group.id for Kafka Consumer and Structured Streaming is managing the offsets internally and not committing back to Kafka (also not automatically).

- 16,250
- 3
- 42
- 77
-
Thanks mike, but I am still some confused. Looking at the link https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html, It clearly mentioned that 'kafka.group.id' is possible to set, just need to be very careful. I am wondering if the custom group.id for Kafka consumer is not possible, or just possible in some latest version, such as 3.0.0. thanks ```The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data``` – yyuankm Sep 05 '20 at 11:04
-
yes, only possible in v3, not in 2.4.6. Please also look at the answers given in your other [question](https://stackoverflow.com/questions/63203448/how-to-specify-the-group-id-of-kafka-consumer-for-spark-structured-streaming) you asked about a similar topic. – Michael Heil Sep 05 '20 at 11:09
-
Thanks mike. yeah, I also asked one similar topic before, but not very certain yet. May I understand that if the following assumptions are true for setting kafka.group.id in v3? 1) Kafka broker will be able to maintain the offsets by itself as the kafka standard. 2) The custom group_id will override the internal group_id maintained by spark to submit to kafka broker. 3) Spark will commit back to kafka automatically. I do not need do anything else to avoid the data loss, such as to commit the offset manually, etc. right? – yyuankm Sep 05 '20 at 11:22
-
Thanks for the reminder. I am some new to Stackoverflow, so I forgot to accept the answers. It is true. I already accepted them now based on your comments. For most questions, I also did upvote before. For me, I do respect people's help in answering my questions. As for my following question to you, I also start with "Thanks". As for the similar question, I also explained that it is just to futher confirm. Even as your answer, you also mentioned " it is currently not possible to define your custom group.id", so I want to double confirm if it is supported in spark 3.0, and how is it supported. – yyuankm Sep 06 '20 at 11:42