I've been banging my head against the wall today trying to figure out why this isn't working. I created this multiprocessing class:
class Consumer(multiprocessing.Process):
def __init__(self, topic, **kwargs):
self.topic = topic
super(Consumer, self).__init__(**kwargs)
def _deserializer(serialized):
return json.loads(serialized)
async def _consume(self):
consumer = AIOKafkaConsumer(
self.topic,
# group_id=None,
group_id="Deployment",
value_deserializer=self._deserializer,
bootstrap_servers='localhost:30322',
)
await consumer.start()
tasks = []
try:
async for msg in consumer:
logging.info("***** reading message *****")
tasks.append(asyncio.create_task(process_msg(msg, 1)))
finally:
await consumer.stop()
await asyncio.gather(*tasks)
def run(self):
asyncio.run(self._consume())
And my main file does this:
num_procs = 1
processes = [Consumer("deployment_requests") for _ in range(num_procs)]
for p in processes:
p.start()
for p in processes:
logging.info(f'pid is {p.pid}')
for p in processes:
p.join()
logging.info(f'pid is {p.pid}')
And the output
2023-05-02 16:03:58 - INFO - pid is 23520
2023-05-02 16:04:03 - INFO - Updating subscribed topics to: frozenset({'deployment_requests'})
2023-05-02 16:04:03 - INFO - Discovered coordinator 0 for group Deployment
2023-05-02 16:04:03 - INFO - Revoking previously assigned partitions set() for group Deployment
2023-05-02 16:04:03 - INFO - (Re-)joining group Deployment
2023-05-02 16:04:03 - INFO - Joined group 'Deployment' (generation 182) with member_id aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838
2023-05-02 16:04:03 - INFO - Elected group leader -- performing partition assignments using roundrobin
2023-05-02 16:04:03 - INFO - Successfully synced group Deployment with generation 182
2023-05-02 16:04:03 - INFO - Setting newly assigned partitions {TopicPartition(topic='deployment_requests', partition=0)} for group Deployment
2023-05-02 16:04:03 - INFO - LeaveGroup request succeeded
Process Consumer-1:
2023-05-02 16:04:04 - INFO - pid is 23520
If I take the code out of the class, this code works as expected. But as is, it never even prints "***** reading message *****" so it's not even waiting on messages. So I think it has something to do with p.start() not using the run() method correctly for the asyncio call. But it could also be something completely different :)
Here's the producer logs, but there aren't any issues on the producer side.
[2023-05-02 21:04:03,943] INFO [GroupCoordinator 0]: Stabilized group Deployment generation 182 (__consumer_offsets-15) (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,947] INFO [GroupCoordinator 0]: Assignment received from leader for group Deployment for generation 182 (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,966] INFO [GroupCoordinator 0]: Member[group.instance.id None, member.id aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838] in group Deployment has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,966] INFO [GroupCoordinator 0]: Preparing to rebalance group Deployment in state PreparingRebalance with old generation 182 (__consumer_offsets-15) (reason: removing member aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)