I created single connector to push all insert/update event of the collection to the same topic (1 partition). The consumer code to process the message for insert event will take some time than update event.
The problem here is that the messages are not committing on the basis of consumed order. Below are the steps and behavior of Kafka consumer.
- Insert a record on the collection and update the field within a second.
- The topic now has 2 records. One for insert event (message 1) and other for update event (message 2).
- message 1 is consumed
- message 2 is consumed
- message 2 is committed
- message 1 is committed
Is there any way I can wait until the message 1 is committed and then consume the message 2.
Consumer Code:
const startConsumer = async () => {
// Creating a kafka consumer group.
const kafkaConsumerGroup = new kafkaNode.ConsumerGroup(consumer_configs, topic_name);
kafkaConsumerGroup.on("connect", () => {
console.log("Consumer Group Connected Successfully");
});
// Listening for messages from kafka.
kafkaConsumerGroup.on("message", async (message) => {
//processor code
const data = JSON.parse(message.value);
response = await processData(JSON.parse(data.payload));
//Commit
kafkaConsumerGroup.commit((error, data) => {
if (error) {
console.log(error);
} else {
console.log('Commit success ');
}
});
});
kafkaConsumerGroup.on("error", (error) => {
console.log(error);
});
};