Questions tagged [aiokafka]

Apache Kafka client for asyncio

Apache Kafka client for asyncio

14 questions
7
votes
1 answer

Connect Python to MSK with IAM role-based authentication

I've written a python script with aiokafka to produce and consume from a Kafka cluster in AWS MSK, I'm running the script from a EC2 instance that is in the same VPC as my cluster and when I try to connect my script to a cluster it refuse to accept…
2
votes
0 answers

Error "Auto offset commit failed: [Error 25] UnknownMemberIdError:" after days of use

This problem always occurs after a few days that microservices communicate with kafka, I have 3 nodes, and for each microservice I use a group id on a specific topic. The error is as follows. Unable connect to node with id 1: Failed fetch messages…
Plaoo
  • 417
  • 3
  • 19
1
vote
1 answer

unable to consume asynchronous msg from producer & consumer

Kafka , zookeeper is running successfully This is my producer.py async def publish(): producer = AIOKafkaProducer(bootstrap_servers='localhost:9092', enable_idempotence=True) await producer.start() consumer = AIOKafkaConsumer( …
Tanjin Alam
  • 1,728
  • 13
  • 15
1
vote
0 answers

Kafka consumer records - processing

For kafka I'm using getmany to read the consumer messages. In the total of 650 messages(which will take around 3days to process), processing happens for around 100-150 records(sometimes 12hrs or sometimes 24hrs) and then there is no further…
evolver
  • 11
  • 2
1
vote
0 answers

Aiokafka connecting fails silently

I'm trying to connect a aiokafka consumer with the following config. kafka_config = { "bootstrap_servers": "b-2.cpkafkacluster-2106.vi24p2.c1.kafka.eu-central-1.amazonaws.com:9096", "security_protocol": "SASL_SSL", …
jacksbox
  • 911
  • 1
  • 11
  • 24
0
votes
0 answers

Python aiokafka - how to set max retry limit per message?

I use aiokafka 0.8.0 in Python 3.9 to consume a Kafka topic. Occasionally the processing fails with what we would normally consider a Retryable exception, so the message is sent again soon after. In some cases the exception is not genuinely…
Brendan Hill
  • 3,406
  • 4
  • 32
  • 61
0
votes
1 answer

aiokafka exits when running in a multiprocessing class

I've been banging my head against the wall today trying to figure out why this isn't working. I created this multiprocessing class: class Consumer(multiprocessing.Process): def __init__(self, topic, **kwargs): self.topic = topic …
Jon Hayden
  • 153
  • 1
  • 1
  • 7
0
votes
0 answers

How to correctly manage FastAPI WebSockets with a AIOKafka's Consumer

I have this simple websocket endpoint in FastAPI that consumes data from a Kafka Server with the AIOKafka package and send it through the websocket. @router.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id:…
luis2105
  • 1
  • 2
0
votes
0 answers

python aiokafka many consumers to many producers

I use aiokafka to consume from, filter message fields and produce messages back to kafka. I run 4 async consumers which put messages to async queue. Then single process consumes that queue and produces to async output_queue. Multiple produces…
Dariusz Krynicki
  • 2,544
  • 1
  • 22
  • 47
0
votes
1 answer

Aiokafka consumer process events in parallel

I'm just getting started with kafka, I have a k8s cluster in which I want to deploy event listeners. When I have one listener running, everything works fine, but with several pods they process events in parallel, I would like the event to be…
0
votes
0 answers

Aiokafka consumer stuck in infinite loop

Hi I am using AioKafka consumer to read message published by another process. The other process has just published one message and my consumer code is reading same message infinitely. I tried to use manual commit but in vain. I am using library:…
0
votes
0 answers

python fastapi, aiokafka starvation problem

Problem : aiokafka consumer starving the fastapi endpoint, due to which our kubernetes liveness probes are failing and any other service calling the exposed endpoints are getting timed out. Details : There is a kafka consumer, which starts during…
narendra
  • 121
  • 1
  • 6
0
votes
0 answers

aiokafka producer running on one core in a python:buster Docker container achieves very low throughput 1 MiB per second

I have some simple code that to test the performance of aiokafka library. I am using a Windows computer, running Docker for Windows, and a virtual machine with 8 cores. The library aiokafka seems to achieve abusrdly low producer throughput in this…
tacos_tacos_tacos
  • 10,277
  • 11
  • 73
  • 126
0
votes
1 answer

Which Partition to use when Performing a Offset Seek using AIOKafkaConsumer?

When trying to let an AIOKafkaConsumer start reading messages from a specific offset starting_offset, how do we know which partition to be used? I am trying to use the AIOKafkaConsumer.seek method, but it requires a TopicPartition to be specified…