2

I am writing a Python application. It reads off a kafka topic using a consumer. With each message, it does some stuff that can take a while to complete before doing some stuff with the next message.

Most applications using multiprocessing library involve passing some finite iterable to map_async or apply_async. My attempts to solve this problem using these two functions don't seem to work, I think because our iterable in this case is the kafka topic, which is an unbound queue. Is there a way to 'do some stuff' in a non-blocking manner in this kind of scenario?

A.A.
  • 402
  • 1
  • 4
  • 19
  • A working pattern for processing infinite streams would be to initially distribute as much tasks as you have workers, then submit new ones as old ones finish. My answer [here](https://stackoverflow.com/a/54702118/9059420) shows a non-blocking way to do it with `pool.apply_async`. – Darkonaut Feb 20 '19 at 16:56

1 Answers1

2

You can create a child proccess and pass the message to it for handling some stuff:

from confluent_kafka import Consumer, KafkaError
from multiprocessing import Process


def do_stuff(msg):
    my_stuff = 'is doing here as a non-blocking way'

c = Consumer({
    'bootstrap.servers': 'mybroker',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['mytopic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    process = Process(target=do_stuff, args=(msg.value().decode('utf-8'), ))
    process.start()

c.close()
Amin
  • 975
  • 8
  • 24