1

I created single connector to push all insert/update event of the collection to the same topic (1 partition). The consumer code to process the message for insert event will take some time than update event.

The problem here is that the messages are not committing on the basis of consumed order. Below are the steps and behavior of Kafka consumer.

  1. Insert a record on the collection and update the field within a second.
  2. The topic now has 2 records. One for insert event (message 1) and other for update event (message 2).
  3. message 1 is consumed
  4. message 2 is consumed
  5. message 2 is committed
  6. message 1 is committed

Is there any way I can wait until the message 1 is committed and then consume the message 2.

Consumer Code:

    const startConsumer = async () => {
    
        // Creating a kafka consumer group.
        const kafkaConsumerGroup = new kafkaNode.ConsumerGroup(consumer_configs, topic_name);
    
        kafkaConsumerGroup.on("connect", () => {
            console.log("Consumer Group Connected Successfully");
        });
    
        // Listening for messages from kafka.
        kafkaConsumerGroup.on("message", async (message) => {
    
                //processor code
                const data = JSON.parse(message.value);
                response = await processData(JSON.parse(data.payload));
    
                //Commit
                kafkaConsumerGroup.commit((error, data) => {
                    if (error) {
                        console.log(error);
                    } else {
                        console.log('Commit success ');
                    }
                });
        });
    
        kafkaConsumerGroup.on("error", (error) => {
            console.log(error);
        });
    };
Nandy
  • 666
  • 1
  • 12
  • 27
  • I have modified my post by adding sample code. – Nandy Sep 25 '20 at 09:14
  • I have to make database calls on processData() method and that needs to be await. Without making the method async it is not possible to do database call with await. Please provide your suggestion – Nandy Sep 25 '20 at 09:38
  • https://stackoverflow.com/questions/42469123/how-to-control-the-concurrency-of-processing-messages-by-consumergroup - This link helps me to handle kafka messages in sequence – Nandy Sep 28 '20 at 08:50

0 Answers0