3

I have a Python Kafka consumer application where I consume the messages and then call an external webservice synchronously. The webservice takes a minute to process the message and send the response.

Is there a way to consume the message, send the request to the Web service and consume the next message without waiting for the response?


from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'spring_test',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')));

This is how I wait for the messages and send an external Web request


def consume_msgs():
 for message in consumer:
    message = message.value;
    send('{}'.format(message))

consume_msgs() 

The function send() takes one minute before I get the response. I want to start consuming the next message in the meantime asynchronously but I don't know where to start

def send(pload) :
 import requests
 r = requests.post('someurl',data = pload)
 print(r)
InfoLearner
  • 14,952
  • 20
  • 76
  • 124
  • Could you share your send function? – William Hammond Oct 29 '20 at 20:22
  • Sure. Just edited the question – InfoLearner Oct 29 '20 at 20:28
  • I'm not going to post an answer yet, since i can't recall if there are any major differences between how the python consumer handles batching vs the java consumer but I believe if you make async network calls on a batch of message using something like https://stackoverflow.com/questions/22190403/how-could-i-use-requests-in-asyncio, you can await the completion of a batch and then call `commit` afterwards. – William Hammond Oct 29 '20 at 20:30
  • Actually looks like the people who made AIO also have an async kafka consumer https://github.com/aio-libs/aiokafka – William Hammond Oct 29 '20 at 20:32

1 Answers1

0

Not sure if this is what you need but could you just spin each call to send out into a thread? Something like this the below. This way the for loop will continue without waiting for send to return. You may have to throttle the number of threads somehow if you are consuming data far quicker than you are processing it.

from threading import Thread  


def consume_msgs():
 for message in consumer:
    message = message.value;
    Thread(target=send, args = ('{}'.format(message),)).start()

consume_msgs() 
Simon Notley
  • 2,070
  • 3
  • 12
  • 18