1

I was wondering if anyone has come across this (probably very straightforward). I have a setup where I have each server setup to be:

1) 1 consumer per consumer group( e.g. consumer group "cg1" and only 1 consumer ) at any time.

2) 1 Consumer group can subscribe to multiple topics

3) If the server goes down, upon restart the consumer group rejoins and subscribes to the topic again.

Currently the way I am consuming messages is (without posting the whole code):

1) I instantiate a new consumer (setup as part of a TimeTask executes every 30s)

2) I load all the consumer properties from a Properties file.

3) I assign the same group.id every time.

4) I consume message

5) I commit offset (enable.auto.commit=false) using .commitSync()

5) I close the consumer.

My Topics have 6 partitions and 3 replication factor. I would like to close the consumer every time I finish committing offsets because it's probably safer to release any resources back.

It seems that every time my sever restarts, I am consuming the messages again. I have auto.offset.reset=earliest and this means that when there is no valid offset details can be retrieved for a consumer group, I will always start from the earliest available offset. BUT this doesn't affect any existing consumer group which has successfully committed offsets to the cluster. I should start receiving messages from the last committed offsets even if I rejoin.

Is there something I have misunderstood about Consumer Group or Consumer which plays key part in auto.offset.reset settings? Or Do I actually need to do some manual here? I am assuming that having more partitions than actual consumers-per-group is causing problems, but I would be happy to learn more.

** SAMPLE CONSUMER CODE **

public void process() {
    logger.info("beginning of process()");
    ConsumerRecords<byte[], byte[]> records = this.getConsumer().poll(KafkaConstants.KAFKA_POLL_TIME_MILLIS);
    if (records != null && records.count() > 0) {

        // Prescribed by Kafka API to have finer control over offsets
        for (TopicPartition partition : records.partitions()) {

            List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition);
            for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {

                try {
                    logger.info("beginning of processEachRecord()");
                    this.processEachRecord(record);
                } catch (Exception e) {
                    logger.info("Exception whilst processing messages");
                    logger.error(e);
                    logger.info("Closing consumer after exception in processing");
                    this.getConsumer().close();
                    return;
                }

                try {
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset+1)));
                } catch (CommitFailedException cfe) {
                    logger.info("Commit Failed------");
                    logger.error(cfe);
                    logger.info("Closing consumer after failed commit");
                    this.getConsumer().close();
                    return;
                } 
            }
        }
    }
    logger.info("Total records=" + records.count());
    logger.info("Closing consumer");
    this.getConsumer().close();

}

consumer config:

key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id=samegroup1
enable.auto.commit=false
auto.offset.reset=earliest
fetch.min.bytes=10
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094

Thanks Vahid H and Hans J who commented - this SO answer also explains what might have gone wrong for me.

producer config (i have a 3 broker-zookeeper cluster so ignore the ids);

broker.id=2
listeners=PLAINTEXT://localhost:9094
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=999999999
log.dirs=/tmp/kafka-logs-3
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=2
log.retention.minutes=45
log.retention.bytes=20971520
log.segment.bytes=10485760
log.roll.hours=1    
log.retention.check.interval.ms=300000
offsets.retention.minutes=20
offsets.retention.check.interval.ms=300000
log.cleanup.policy=compact,delete
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=30000
compression.type=gzip
delete.topic.enable=true
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false

This is somewhat important for me because with auto.offset.reset=latest and log.retention.minutes and offsets.retention.minutes I should be able to prevent duplicates ("Exactly once" mechanism). But KAFKA-1194 is causing the following

kafka.common.KafkaStorageException: Failed to change the log file suffix from  to .deleted for log segment 2
        at kafka.log.LogSegment.kafkaStorageException$1(LogSegment.scala:340)
        at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:342)
        at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:981)
        at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:971)
        at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:673)
        at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:673)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at kafka.log.Log.deleteOldSegments(Log.scala:673)
        at kafka.log.Log.deleteRetenionMsBreachedSegments(Log.scala:703)
        at kafka.log.Log.deleteOldSegments(Log.scala:697)
        at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:474)
        at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:472)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.log.LogManager.cleanupLogs(LogManager.scala:472)
        at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:200)
        at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.file.FileSystemException: \kafka2\40-0\00000000000000000002.log -> \kafka2\40-0\00000000000000000002.log.deleted: The process cannot access the file because it is being used by another process.

        at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
        at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
        at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
        at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
        at java.nio.file.Files.move(Files.java:1395)
        at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:711)
        at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:210)
        ... 28 more
        Suppressed: java.nio.file.FileSystemException: \kafka2\40-0\00000000000000000002.log -> \kafka2\40-0\00000000000000000002.log.deleted: The process cannot access the file because it is being used by another process.

                at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
                at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
                at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
                at 

This is why I have to retain the offsets and the topics for a huge period. If anyone has a better suggestion on how I can maintain non-duplicate message consumption without log cleanup it will be helpful.

ha9u63a7
  • 6,233
  • 16
  • 73
  • 108
  • This works for me as expected with the default configuration. Have you overwritten any of the default configs? – vahid Jul 05 '17 at 23:16
  • @vahid `auto.offset.reset=earliest` i have set, and I am polling for 1000ms every time. attached is my sample consumer code. – ha9u63a7 Jul 06 '17 at 00:04
  • Thanks for sharing your code; however, this code still works for me. Have you checked the group offsets in between running consumers to make sure they increase as new messages come in, and don't get reset for some reason? – vahid Jul 06 '17 at 01:42
  • @vahid sorry I probably don't have that configured. This is also why I am getting a delay/some kind of message loss as I cannot see them all the time - Even when I can see the message being produced and sent successfully. Could you please explain what you meant? – ha9u63a7 Jul 06 '17 at 02:23
  • What I meant to ask was if you could, for example, run the KafkaConsumerGroup command and verify that the offsets keep going up as you produce and consume messages and never reset to an earlier offset. You could try this to monitor offsets as you go through producing/consuming messages: `watch bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group samegroup1` – vahid Jul 06 '17 at 03:12
  • 1
    Offsets expire in 24 hours (by default) so make sure that's not your problem. Also if you did the QuickStart make sure your Kafka log.dir is not in /tmp because then a reboot also loses all Kafka data including offsets. – Hans Jespersen Jul 06 '17 at 03:26
  • @HansJespersen I actually have Zookeeper `data.dir` and Kafka `log.dirs` both in `C:/tmp/`. Do you mean to say that I need to keep it any location different other than 'C:/temp/` e.g. `C:/logdirs`? – ha9u63a7 Jul 06 '17 at 08:47
  • If you're using Windows you should be fine with respect to `tmp` folder. On Linux the content of the `/tmp` folder gets wiped out after a reboot. – vahid Jul 06 '17 at 18:45
  • @vahid Thanks. I am now stuck with the issue where log deletion is constantly facing IOException since the files are being used by another process, even when I have no consumer activity. This is the same as KAFKA-1194. – ha9u63a7 Jul 06 '17 at 19:45
  • 1
    That seems to be a different issue from what you reported above. Hopefully that JIRA gets fixed soon. I just noticed in your broker config you use both `log.retention.hours=2` and `log.retention.minutes=45`. According to the doc, `log.retention.minutes` takes precedence over `log.retention.hours`. Therefore, your Kafka logs are deleted after 45 minutes with these configs. – vahid Jul 06 '17 at 20:52
  • @vahid yes I did it purpopsefully to trigger the cleanup earlier. Since this isn't working i am consuming duplicates and getting false positives. – ha9u63a7 Jul 06 '17 at 21:08
  • Did you try monitoring the offsets using the consumer group command mentioned earlier? – vahid Jul 06 '17 at 21:58
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/148555/discussion-between-ha9u63ar-and-vahid). – ha9u63a7 Jul 06 '17 at 22:09
  • Hi, I am facing a similar issue with the messages getting consumed again everytime after restart. May I know how did you manage to resolve th issue? – user1372469 Jun 25 '22 at 13:05

0 Answers0