0

Problem : aiokafka consumer starving the fastapi endpoint, due to which our kubernetes liveness probes are failing and any other service calling the exposed endpoints are getting timed out.

Details :

There is a kafka consumer, which starts during fastapi startup event, and keep on listening to the particular topic.

And then there is fastapi endpoint which serves the request.

When there are lot messages in kafka topic partion, kafka consumer starving the eventloop and the requests served by fastapi endpoints are timing out.

How can we solve this problem?

#all the imports



consumer = None
consumer_task = None

log = None

def get_application():
    #initialize fastapi app and with different routes and do some stuff
    return app

app = get_application()

@app.on_event("startup")
async def startup_event():
    #initialize consumer
    await initialize()
    # start consuming
    await consume()

@app.on_event("shutdown")
async def shutdown_event():
    #close consumer

async def initialize():
    #initilize
    # get cluster layout and join group
    await consumer.start()
    await consumer.seek_to_committed()

async def consume():
    global consumer_task
    loop = asyncio.get_event_loop()
    consumer_task = loop.create_task(send_consumer_message(consumer))


async def send_consumer_message(consumer):
    try:
        # consume messages
        async for msg in consumer:
            #do message processing
    except Exception as e:
        log.info(f"message consuming failed withe error: {repr(e)}")
    finally:
        log.warning("stopping consumer")
        await consumer.stop()
narendra
  • 121
  • 1
  • 6
  • 1
    Do you have a proper example that shows how you've structured your application? – MatsLindh Oct 11 '22 at 09:52
  • updated the description with the code – narendra Oct 11 '22 at 10:52
  • While I'm far from an expert (or an advanced user) in managing the event loop, have you seen the note in https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_event_loop about using `get_running_loop` instead of `get_event_loop`, in particular inside coroutines? Could that be the issue, that you have two loops that starve each other instead of having them in the same eventloop? – MatsLindh Oct 11 '22 at 11:19
  • i am from javascript background. well my understanding is, the starvation problem is due to the callbacks(can i say couroutines?) are added to queue, and they will be picked in the order by eventloop, so when i send a new request, the request couroutine will be added at the end of queue, eventloop is picking up the kafka messages, while my sent request is being timed out, due to the processing of kafka messages. that's what i understood. – narendra Oct 11 '22 at 11:29
  • i javascript there are something like microtasks, i don't know whether if there is anything as such in python. microtasks are given more priority(based on eventloop implementation). But in python there doesn't seem to be a way where we can prioritize certain task – narendra Oct 11 '22 at 11:32

0 Answers0