I've written a python script with aiokafka to produce and consume from a Kafka cluster in AWS MSK, I'm running the script from a EC2 instance that is in the same VPC as my cluster and when I try to connect my script to a cluster it refuse to accept…
This problem always occurs after a few days that microservices communicate with kafka, I have 3 nodes, and for each microservice I use a group id on a specific topic. The error is as follows.
Unable connect to node with id 1:
Failed fetch messages…
For kafka I'm using getmany to read the consumer messages. In the total of 650 messages(which will take around 3days to process), processing happens for around 100-150 records(sometimes 12hrs or sometimes 24hrs) and then there is no further…
I'm trying to connect a aiokafka consumer with the following config.
kafka_config = {
"bootstrap_servers": "b-2.cpkafkacluster-2106.vi24p2.c1.kafka.eu-central-1.amazonaws.com:9096",
"security_protocol": "SASL_SSL",
…
I use aiokafka 0.8.0 in Python 3.9 to consume a Kafka topic.
Occasionally the processing fails with what we would normally consider a Retryable exception, so the message is sent again soon after.
In some cases the exception is not genuinely…
I've been banging my head against the wall today trying to figure out why this isn't working. I created this multiprocessing class:
class Consumer(multiprocessing.Process):
def __init__(self, topic, **kwargs):
self.topic = topic
…
I have this simple websocket endpoint in FastAPI that consumes data from a Kafka Server with the AIOKafka package and send it through the websocket.
@router.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id:…
I use aiokafka to consume from, filter message fields and produce messages back to kafka.
I run 4 async consumers which put messages to async queue.
Then single process consumes that queue and produces to async output_queue.
Multiple produces…
I'm just getting started with kafka, I have a k8s cluster in which I want to deploy event listeners. When I have one listener running, everything works fine, but with several pods they process events in parallel, I would like the event to be…
Hi I am using AioKafka consumer to read message published by another process. The other process has just published one message and my consumer code is reading same message infinitely. I tried to use manual commit but in vain.
I am using library:…
Problem : aiokafka consumer starving the fastapi endpoint, due to which our kubernetes liveness probes are failing and any other service calling the exposed endpoints are getting timed out.
Details :
There is a kafka consumer, which starts during…
I have some simple code that to test the performance of aiokafka library. I am using a Windows computer, running Docker for Windows, and a virtual machine with 8 cores.
The library aiokafka seems to achieve abusrdly low producer throughput in this…
When trying to let an AIOKafkaConsumer start reading messages from a specific offset starting_offset, how do we know which partition to be used?
I am trying to use the AIOKafkaConsumer.seek method, but it requires a TopicPartition to be specified…