10

I am facing an error with the library AIOKafka in Python (versions at the end). Basically, I am receiving a failed heartbeat message and then the commit of the offsets cannot be performed. This is the log:

Heartbeat failed for group my-group-dag-kafka because it is rebalancing
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1)for group my-group-dag-kafka.
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
Auto offset commit failed: [Error 25] UnknownMemberIdError: my-group-dag-kafka
Traceback (most recent call last):
  File "/app/manage.py", line 23, in <module>
    server.serve()
  File "/usr/local/lib/python3.7/site-packages/libbase/base.py", line 146, in serve
    self._start_mq()
  File "/usr/local/lib/python3.7/site-packages/libbase/base.py", line 186, in _start_mq
    self.loop.run_until_complete(self.async_mq_loop())
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.7/site-packages/libbase/base.py", line 202, in async_mq_loop
    async for message in self.consumer:
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 1220, in __anext__
    return (yield from self.getone())
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 1101, in getone
    msg = yield from self._fetcher.next_record(partitions)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/fetcher.py", line 1051, in next_record
    yield from waiter
kafka.errors.UnknownMemberIdError: [Error 25] UnknownMemberIdError: my-group-dag-kafka
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7fd58b49de10>
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7fd58bd9c4d0>
Task was destroyed but it is pending!
task: <Task pending coro=<Sender._sender_routine() running at /usr/local/lib/python3.7/site-packages/aiokafka/producer/sender.py:151> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd58ab91ad0>()]> cb=[Sender._fail_all()]>

The python version is 3.7.4, AIOKafka version is 0.5.2 and the Consumer configuration is

self.consumer = AIOKafkaConsumer(
            self.config.KAFKA_TOPIC,
            loop=self.loop,
            bootstrap_servers=self.config.KAFKA_URL,
            group_id=self.config.KAFKA_GROUP_ID,
            fetch_min_bytes=100000)

The error happens spontaneously. I tried changing this configuration

heartbeat.interval.ms
session.timeout.ms
max.poll.interval.ms

But I believe the standard values should be ok. What is this error saying? That the consumer is not part of a group anymore and this loop just ends?. BTW, after this error happens, the container just dies


        async for message in self.consumer:
            try:
                asyncio.create_task(self.async_handle_message(message))
            except Exception as e:
                import traceback

                traceback.print_exc()
                print('Exception: ', e, flush=True)

Any guidance to Debug the problem or to tune the configuration is well received. Have a nice day :)

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Victor Cadena
  • 191
  • 2
  • 7
  • Please check the discussion in this GitHub issue opened https://github.com/aio-libs/aiokafka/issues/575. – Victor Cadena Mar 09 '21 at 15:53
  • 1
    have you been able to resolve this? We are seeing the same in our prod environment, with aiokafka==0.70 and kafka-python==2.0.2 and Kafka version 2.4 – Ajay M Apr 30 '21 at 18:06

0 Answers0