1

I'm learning Python multiprocessing module and I've found this example:

from multiprocessing import Process, Queue
import time

def reader(queue):
    ## Read from the queue
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        queue = Queue()   # reader() reads from queue
                          # writer() writes to queue
        reader_p = Process(target=reader, args=((queue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader() as a separate python process

        _start = time.time()
        writer(count, queue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print "Sending %s numbers to Queue() took %s seconds" % (count, 
            (time.time() - _start))

I was wondering when the queue.get() will return DONE so I've tried following example:

#!/bin/env python
from multiprocessing import Process, Queue
import time

if __name__=='__main__':
  queue = Queue()

  print "Before 2x put"
  queue.put(10)
  queue.put(20)
  print "After 2x put"

  print "Before 1s get"
  print queue.get()
  print "After 1st get"

  print "Before 2nd get"
  print queue.get()
  print "After 2nd get"

  print "Before 3rd get"
  print queue.get()
  print "After 3rd get"

The last message from this script is Before 3rd get, after this the script stuck and only way to end it is to terminate it. From this example you can see that queue.get() is blocking (the code will not continue until it ends). How is it possible that in the original code queue.get() returns DONE, when this situation happens?

EDIT

In reply to @KemyLand which nicely explained what is going on here, this is the version that did not stuck:

#!/bin/env python
from multiprocessing import Process, Queue
import time

if __name__=='__main__':
  queue = Queue()

  print "Before 2x put"
  queue.put(10)
  queue.put(20)
  print "After 2x put"

  print "Before 1s get"
  print queue.get()
  print "After 1st get"

  print "Before 2nd get"
  print queue.get()
  print "After 2nd get"

  print "Before 3rd get"
  try:
    print queue.get_nowait()
    print "After 3rd get"
  except:
    pass
Community
  • 1
  • 1
Wakan Tanka
  • 7,542
  • 16
  • 69
  • 122

1 Answers1

1

This is very simple.

In your first code, the "protocol" that was agreed between reader and writer was writer sends any amount of data to reader, then writer sends 'DONE', reader receives it and understands that the data transfer is complete.

In your second code, there was no agreed protocol between reader and writer, because the writer's point of view is "I send two objects and I'm done!", while the reader's point of view is "*I receive three objects and I'm done!".

Because no part of the runtime environment is able to detect whenever a protocol error has occurred, the application simply blocks, waiting for data that will never come. The only one that can detect this situation is the application itself, because it's the only one that knows of the protocol it obeys. For this purposes, you can use Queue.Queue.get_nowait() (you must import Queue (with capital Q), as multiprocessing.Queue is just an alias for Queue.Queue). If uch a function can't extract an object inmediatedly from the Queue, it'll throw a Queue.Empty exception. (Note: this mess with the module's name was fixed in Python 3).

3442
  • 8,248
  • 2
  • 19
  • 41
  • Oh, such scholar mistake. Didn't realize that writer sends `DONE`. To be honest I did not fully understood what you've been saying about import. I've modified my question to include working solution, can you please check. Thank you very much. – Wakan Tanka Aug 24 '15 at 23:05
  • @WakanTanka: Of course it's correct! Tell us if you have other issues :). – 3442 Aug 24 '15 at 23:11
  • @KemmyLand, I'm learning multiprocessing module so I've many issues which I will posting ongoing ;) If you are interested please check my profile there are several questions within past 24 hours, and so far nobody pushed me to right direction. Thank you very very much. – Wakan Tanka Aug 24 '15 at 23:15
  • @WakanTanka: I'll go to your profile in a while. This is a great way for me to something useful in an evening :). – 3442 Aug 24 '15 at 23:26