6

In one of my use-case consist of consuming the data, do some operations and produce it to new topic.

I'm using https://www.npmjs.com/package/kafkajs npm library.

I would like to commit the offset manually after successful operations to avoid any data loss. I'm using autoCommit: false to avoid data auto committing after consuming.

This is the code to commit offset manually

consumer.commitOffsets([
  { topic: 'topic-A', partition: 0, offset: '1' }
])

As I read somewhere that if we commit each offset intentionally (commit offset immediately after cosuming) then it will create load on brokers and its not good to do.

I need kafka expert advise to suggest the best approach on my above use case to avoid any data loss? please advise

Parveen Kumar
  • 397
  • 1
  • 5
  • 12

1 Answers1

1

in order to handle commit manually below is the code.

await consumer.run({
  autoCommit: false,
  eachMessage: async ({ topic, partition, message }) => {
    ...
    await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
  },
});
Karunakaran
  • 385
  • 5
  • 16