2

I want to manually commit messages when all tasks are finished(like pushing message into database ), in my consumer group. How can I disable auto commit and commit messages manually.

jemiloii
  • 24,594
  • 7
  • 54
  • 83
Rakshita Jain
  • 149
  • 2
  • 9

2 Answers2

2

Set autoCommit false

const kafka = require('kafka-node');

const config = require('../config');

const client = new kafka.KafkaClient({
  kafkaHost: config.kafkaHost
});

const consumer = new kafka.Consumer(client, [
  {
    topic: config.kafkaTopic
  }
], {
  autoCommit: false
});

Then commit manually-

consumer.on('message', (message) => {
  console.log('message', message);
  // feed data into db 
  consumer.commit((error, data) => {
    if (error) {
      console.error(error);
    } else {
      console.log('Commit success: ', data);
    }
  });
});
Varun Kumar
  • 2,543
  • 1
  • 23
  • 22
  • I have did the same thing, but still getting the already read message. https://stackoverflow.com/questions/62564644/getting-already-message-from-confluent-kafka-using-kafka-node-nodejs-lib – Vipin Kumar R. Jaiswar Jun 25 '20 at 09:55
0

Use node-rdkafka instead. It works very well . you can commit a message like

consumer.commit({ topic: data.topic, partition: data.partition, offset: data.offset + 1 }, function(err, data) {})
chetan dev
  • 611
  • 2
  • 6
  • 16