I was wondering if anyone has come across this (probably very straightforward). I have a setup where I have each server setup to be:
1) 1 consumer per consumer group( e.g. consumer group "cg1" and only 1 consumer ) at any time.
2) 1 Consumer group can subscribe to multiple topics
3) If the server goes down, upon restart the consumer group rejoins and subscribes to the topic again.
Currently the way I am consuming messages is (without posting the whole code):
1) I instantiate a new consumer (setup as part of a TimeTask executes every 30s)
2) I load all the consumer properties from a Properties
file.
3) I assign the same group.id
every time.
4) I consume message
5) I commit offset (enable.auto.commit=false
) using .commitSync()
5) I close the consumer.
My Topics have 6 partitions and 3 replication factor. I would like to close the consumer every time I finish committing offsets because it's probably safer to release any resources back.
It seems that every time my sever restarts, I am consuming the messages again. I have auto.offset.reset=earliest
and this means that when there is no valid offset details can be retrieved for a consumer group, I will always start from the earliest available offset. BUT this doesn't affect any existing consumer group which has successfully committed offsets to the cluster. I should start receiving messages from the last committed offsets even if I rejoin.
Is there something I have misunderstood about Consumer Group or Consumer which plays key part in auto.offset.reset
settings? Or Do I actually need to do some manual here? I am assuming that having more partitions than actual consumers-per-group is causing problems, but I would be happy to learn more.
** SAMPLE CONSUMER CODE **
public void process() {
logger.info("beginning of process()");
ConsumerRecords<byte[], byte[]> records = this.getConsumer().poll(KafkaConstants.KAFKA_POLL_TIME_MILLIS);
if (records != null && records.count() > 0) {
// Prescribed by Kafka API to have finer control over offsets
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition);
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
try {
logger.info("beginning of processEachRecord()");
this.processEachRecord(record);
} catch (Exception e) {
logger.info("Exception whilst processing messages");
logger.error(e);
logger.info("Closing consumer after exception in processing");
this.getConsumer().close();
return;
}
try {
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset+1)));
} catch (CommitFailedException cfe) {
logger.info("Commit Failed------");
logger.error(cfe);
logger.info("Closing consumer after failed commit");
this.getConsumer().close();
return;
}
}
}
}
logger.info("Total records=" + records.count());
logger.info("Closing consumer");
this.getConsumer().close();
}
consumer config:
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id=samegroup1
enable.auto.commit=false
auto.offset.reset=earliest
fetch.min.bytes=10
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
Thanks Vahid H and Hans J who commented - this SO answer also explains what might have gone wrong for me.
producer config (i have a 3 broker-zookeeper cluster so ignore the ids);
broker.id=2
listeners=PLAINTEXT://localhost:9094
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=999999999
log.dirs=/tmp/kafka-logs-3
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=2
log.retention.minutes=45
log.retention.bytes=20971520
log.segment.bytes=10485760
log.roll.hours=1
log.retention.check.interval.ms=300000
offsets.retention.minutes=20
offsets.retention.check.interval.ms=300000
log.cleanup.policy=compact,delete
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=30000
compression.type=gzip
delete.topic.enable=true
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false
This is somewhat important for me because with auto.offset.reset=latest
and log.retention.minutes
and offsets.retention.minutes
I should be able to prevent duplicates ("Exactly once" mechanism). But KAFKA-1194 is causing the following
kafka.common.KafkaStorageException: Failed to change the log file suffix from to .deleted for log segment 2
at kafka.log.LogSegment.kafkaStorageException$1(LogSegment.scala:340)
at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:342)
at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:981)
at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:971)
at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:673)
at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:673)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.log.Log.deleteOldSegments(Log.scala:673)
at kafka.log.Log.deleteRetenionMsBreachedSegments(Log.scala:703)
at kafka.log.Log.deleteOldSegments(Log.scala:697)
at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:474)
at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:472)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.LogManager.cleanupLogs(LogManager.scala:472)
at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:200)
at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.file.FileSystemException: \kafka2\40-0\00000000000000000002.log -> \kafka2\40-0\00000000000000000002.log.deleted: The process cannot access the file because it is being used by another process.
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:711)
at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:210)
... 28 more
Suppressed: java.nio.file.FileSystemException: \kafka2\40-0\00000000000000000002.log -> \kafka2\40-0\00000000000000000002.log.deleted: The process cannot access the file because it is being used by another process.
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
at
This is why I have to retain the offsets and the topics for a huge period. If anyone has a better suggestion on how I can maintain non-duplicate message consumption without log cleanup it will be helpful.