3

I have a streaming job running on Spark 2.1.1, polling Kafka 0.10. I am using the Spark KafkaUtils class to create a DStream, and everything is working fine until I have data that ages out of the topic because of the retention policy. My problem comes when I stop my job to make some changes if any data has aged out of the topic I get an error saying that my offsets are out of range. I have done a lot of research including looking at the spark source code, and I see lots of comments like the comments in this issue: SPARK-19680 - basically saying that data should not be lost silently - so auto.offset.reset is ignored by spark. My big question, though, is what can I do now? My topic will not poll in spark - it dies on startup with the offsets exception. I don't know how to reset the offsets so my job will just get started again. I have not enabled checkpoints since I read that those are unreliable for this use. I used to have a lot of code to manage offsets, but it appears that spark ignores requested offsets if there are any committed, so I am currently managing offsets like this:

val stream = KafkaUtils.createDirectStream[String, T](
    ssc,
    PreferConsistent,
    Subscribe[String, T](topics, kafkaParams))

stream.foreachRDD { (rdd, batchTime) =>
    val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    Log.debug("processing new batch...")

    val values = rdd.map(x => x.value())
    val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist

    consumer.processDataset(incomingFrame, batchTime)
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
}
ssc.start()
ssc.awaitTermination()

As a workaround I have been changing my group ids but that is really lame. I know this is expected behavior and should not happen, I just need to know how to get the stream running again. Any help would be appreciated.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
absmiths
  • 1,144
  • 1
  • 12
  • 21
  • Hi @absmiths, it's been a while since you asked but still I'm facing the same issue. I don't see any answer mark as 'accepted' either.. Has it been any update? Did you find a proper way to deal with this? – Borja Jun 26 '20 at 11:53

4 Answers4

3

Here is a block of code I wrote to get by this until a real solution is introduced to spark-streaming-kafka. It basically resets the offsets for the partitions that have aged out based on the OffsetResetStrategy you set. Just give it the same Map params, _params, you provide to KafkaUtils. Call this before calling KafkaUtils.create****Stream() from your driver.

final OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(_params.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toString().toUpperCase(Locale.ROOT));
if(OffsetResetStrategy.EARLIEST.equals(offsetResetStrategy) || OffsetResetStrategy.LATEST.equals(offsetResetStrategy)) {
    LOG.info("Going to reset consumer offsets");
    final KafkaConsumer<K,V> consumer = new KafkaConsumer<>(_params);

    LOG.debug("Fetching current state");
    final List<TopicPartition> parts = new LinkedList<>();
    final Map<TopicPartition, OffsetAndMetadata> currentCommited = new HashMap<>();
    for(String topic: this.topics()) {
        List<PartitionInfo> info = consumer.partitionsFor(topic);
        for(PartitionInfo i: info) {
            final TopicPartition p = new TopicPartition(topic, i.partition());
            final OffsetAndMetadata m = consumer.committed(p);
            parts.add(p);
            currentCommited.put(p, m);
        }
    }
    final Map<TopicPartition, Long> begining = consumer.beginningOffsets(parts);
    final Map<TopicPartition, Long> ending = consumer.endOffsets(parts);

    LOG.debug("Finding what offsets need to be adjusted");
    final Map<TopicPartition, OffsetAndMetadata> newCommit = new HashMap<>();
    for(TopicPartition part: parts) {
        final OffsetAndMetadata m = currentCommited.get(part);
        final Long begin = begining.get(part);
        final Long end = ending.get(part);

        if(m == null || m.offset() < begin) {
            LOG.info("Adjusting partition {}-{}; OffsetAndMeta={} Begining={} End={}", part.topic(), part.partition(), m, begin, end);

            final OffsetAndMetadata newMeta;
            if(OffsetResetStrategy.EARLIEST.equals(offsetResetStrategy)) {
                newMeta = new OffsetAndMetadata(begin);
            } else if(OffsetResetStrategy.LATEST.equals(offsetResetStrategy)) {
                newMeta = new OffsetAndMetadata(end);
            } else {
                newMeta = null;
            }

            LOG.info("New offset to be {}", newMeta);
            if(newMeta != null) {
                newCommit.put(part, newMeta);
            }
        }

    }
    consumer.commitSync(newCommit);
    consumer.close();
}
NerdyNick
  • 813
  • 1
  • 9
  • 17
1

auto.offset.reset=latest/earliest will be applied only when consumer starts first time.

there is Spark JIRA to resolve this issue, till then we need live with work arounds. https://issues.apache.org/jira/browse/SPARK-19680

anand babu
  • 335
  • 1
  • 3
  • 13
0

Try

auto.offset.reset=latest

Or

auto.offset.reset=earliest

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

none: throw exception to the consumer if no previous offset is found for the consumer's group

anything else: throw exception to the consumer.

One more thing that affects what offset value will correspond to smallest and largest configs is log retention policy. Imagine you have a topic with retention configured to 1 hour. You produce 10 messages, and then an hour later you post 10 more messages. The largest offset will still remain the same but the smallest one won't be able to be 0 because Kafka will already remove these messages and thus the smallest available offset will be 10.

Kishore
  • 5,761
  • 5
  • 28
  • 53
vaquar khan
  • 10,864
  • 5
  • 72
  • 96
  • I tried that at first and was quite perplexed until I read that the KafkaUtils class blanks out that parameter because they deem you to be too ignorant to use it: 17/10/06 15:03:55 WARN KafkaUtils: overriding auto.offset.reset to none for executor – absmiths Oct 06 '17 at 19:16
  • 1
    It does not work at all! – user2088250 Oct 14 '21 at 09:40
0

This problem was solved in the stream structuring structure by including "failOnDataLoss" = "false". It is unclear why there is no such option in the spark DStream framework.

This is a BIG quesion for spark developers!

In our projects, we tried to solve this problem by resetting the offsets form ealiest + 5 minutes ... it helps in most cases.