1

For kafka I'm using getmany to read the consumer messages. In the total of 650 messages(which will take around 3days to process), processing happens for around 100-150 records(sometimes 12hrs or sometimes 24hrs) and then there is no further processing happening. But the consumer stream is not closed, when I put a new message it is processing, but couldn't figure out why it happens.

I thought that may be some messages are getting skipped so to check that I tried to produce only 25 messages - all processed(3hrs) then produced 100 messages - all processed(12hrs), in these situations messages are not getting skipped. Only when many no of messages(650) are put this issue happens.

Code snippet :

async def consume(loop,lock):
    logger.info('Inside Consume')

    consumer = AIOKafkaConsumer(KAFKA_TOPIC,
                        loop=loop,
                        bootstrap_servers=bootstrap_servers,
                        group_id=group_id,           
                        enable_auto_commit=enable_auto_commit,       
                        auto_commit_interval_ms=auto_commit_interval_ms,  
                        auto_offset_reset=auto_offset_reset,  
                        max_poll_records= 1,
                        max_poll_interval_ms=1500000,
                        rebalance_timeout_ms=1500000)
              
    await consumer.start()
    while True:
        result = await consumer.getmany(timeout_ms=1500000, max_records=1)
        for tp, msg in result.items(): 
            if msg:
                message = msg[0].value.decode()
                message_json = json.loads(message)
                data_encoded = message_json['payload']

                if data_encoded.get('content') is not None:
                    msg = data_encoded['content']
                    message_dict_dum = base64.b64decode(msg)
                    message_dict = json.loads(message_dict_dum)
                    
                    if message_dict.get("key1") is not None:
                        if message_dict["key1"] == "something":
                            logger.info('Hurray new message to process')
                            logger.info(message_dict["id"])
                            await process(data_encoded['content'],lock)
                            

async def process(msg, lock):
    async with lock:
        logger.info('processing... ->{}'.format(msg))
        await processmessage(msg)
        logger.info('done processing')        


if __name__ == "__main__":
    logger.info('Inside main')

    loop = asyncio.new_event_loop()
    lock = asyncio.Lock(loop=loop)

    loop.create_task(consume(loop, lock))
    loop.run_forever()
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
evolver
  • 11
  • 2
  • Did you check your offset for the 650 case? Or do you have only one consumer in you consumer group ? – Mathew Apr 17 '23 at 07:45
  • Kafka consumer groups will stop processing if they don't get a heartbeat/commit within a certain amount of time (far less than an hour, often several seconds) – OneCricketeer Apr 17 '23 at 13:39
  • Also, you're missing a lot of else statements to actually debug what could be wrong with your record content, causing it "not to process", when it actually might be, you're just ignoring the (badly formed) event, and not logging any exceptions... – OneCricketeer Apr 17 '23 at 13:41

0 Answers0