1

I am using pykafka and have a producer that is producing asynchronously and using delivery_reports. I know the delivery reports must be read using the "get_delivery_report" method and I know it has to be called in same thread as the message that was produced. However, does get_delievery_report have to be called after each call to produce or can it be called a single time? Will get_delivery_report return all of the failed sends if more than one occurs. For example, say I send 100 messages asynchronously:

for x in xrange(100):
   with topic.get_producer(delivery_reports=Try, sync=False) as producer:
      producer.produce("Test Message")

msg, exc = producer.get_delivery_report()

or does it have to be:

for x in xrange(100):
   with topic.get_producer(delivery_reports=Try, sync=False) as producer:
      producer.produce("Test Message")
      msg, exc = producer.get_delivery_report()

The first seems to run much faster than the second.

GregH
  • 12,278
  • 23
  • 73
  • 109

1 Answers1

0

Aside from the issue with this code's overuse of topic.get_producer (addressed in my answer here), the reason that the first example runs much faster than the second is that the second is effectively running in synchronous mode. That is, every single message produced results in a wait for delivery confirmation before the next message can be produced. If you're interested in writing an application that produces asynchronously, you're probably more interested in something closer to the first example. The correct way to do this is laid out in the pykafka readme:

with topic.get_producer(delivery_reports=True) as producer:
    count = 0
    while True:
        count += 1
        producer.produce('test msg', partition_key='{}'.format(count))
        if count % 10 ** 5 == 0:  # adjust this or bring lots of RAM ;)
            while True:
                try:
                    msg, exc = producer.get_delivery_report(block=False)
                    if exc is not None:
                        print 'Failed to deliver msg {}: {}'.format(
                            msg.partition_key, repr(exc))
                    else:
                        print 'Successfully delivered msg {}'.format(
                        msg.partition_key)
                except Queue.Empty:
                    break

This code produces 10 ** 5 messages asynchronously, then stops producing to consume the delivery report queue, which contains one report per message produced. It prints any reported delivery errors and allows production to resume once the entire queue has been consumed. The 10 ** 5 number can be adjusted depending on your memory limitations - it effectively functions as a bound on the size of the delivery reports queue.

Emmett Butler
  • 5,969
  • 2
  • 29
  • 47