16

I'm trying to use a queue with the multiprocessing library in Python. After executing the code below (the print statements work), but the processes do not quit after I call join on the Queue and there are still alive. How can I terminate the remaining processes?

Thanks!

def MultiprocessTest(self):
  print "Starting multiprocess."
  print "Number of CPUs",multiprocessing.cpu_count()

  num_procs = 4
  def do_work(message):
    print "work",message ,"completed"

  def worker():
    while True:
      item = q.get()
      do_work(item)
      q.task_done()

  q = multiprocessing.JoinableQueue()
  for i in range(num_procs):
    p = multiprocessing.Process(target=worker)
    p.daemon = True
    p.start()

  source = ['hi','there','how','are','you','doing']
  for item in source:
    q.put(item)
  print "q close"
  q.join()
  #q.close()
  print "Finished everything...."
  print "num active children:",multiprocessing.active_children()
FogleBird
  • 74,300
  • 25
  • 125
  • 131
aerain
  • 571
  • 3
  • 7
  • 11

5 Answers5

9

try this:

import multiprocessing

num_procs = 4
def do_work(message):
  print "work",message ,"completed"

def worker():
  for item in iter( q.get, None ):
    do_work(item)
    q.task_done()
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  procs.append( multiprocessing.Process(target=worker) )
  procs[-1].daemon = True
  procs[-1].start()

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

q.join()

for p in procs:
  q.put( None )

q.join()

for p in procs:
  p.join()

print "Finished everything...."
print "num active children:", multiprocessing.active_children()
Andrea Ligios
  • 49,480
  • 26
  • 114
  • 243
underrun
  • 6,713
  • 2
  • 41
  • 53
  • Is there any reason you are putting None into the queue after completion? I thought task_done() could help avoid that problem? I was trying to model my code after the example on the bottom of this page: http://docs.python.org/library/queue.html – aerain Jul 12 '11 at 23:54
  • Not rating the solution, but hinting at how to have it run: move the "q =" declaration line before it's first usage in def worker() ... ;-) – Dilettant Apr 11 '14 at 09:13
  • @aerain - but it does work ... there is a reason i put None into the the queue. the line `for item in iter( q.get, None ):` is key. It tells the loop itself to exit after getting the value of None from the queue. This is what makes the actual process exit. q.join waits for all task_done calls. p.join waits for the termination of processes which can only happen if something breaks out of the look in the worker (or you call terminate on the process but that is less ideal). – underrun Apr 11 '14 at 16:22
  • 2
    @Dilettant - no, that won't actually change anything. q is available in the global namespace when the processes are created so the worker will actually have a copy of q when it is called. it would probably be better to specify `args=(q,)` in the call to `multiprocessing.Process` because then we are explicitly sharing the item - which is good to get in the habit of so you avoid accidentally sharing things you should/can not share. – underrun Apr 11 '14 at 16:31
  • I take the exact code from above, copy it and paste it into a file. Run it. When I hit the q.join() in the middle, processes are spawned in an endless loop and my machine locks up. I see nothing in the code that leads me to believe that would happen, but, I have done this twice. This is with Python 2.7.1. – Doo Dah Jan 15 '16 at 21:07
6

Your workers need a sentinel to terminate, or they will just sit on the blocking reads. Note that using sleep on the Q instead of join on the P lets you display status information etc.
My preferred template is:

def worker(q,nameStr):
  print 'Worker %s started' %nameStr
  while True:
     item = q.get()
     if item is None: # detect sentinel
       break
     print '%s processed %s' % (nameStr,item) # do something useful
     q.task_done()
  print 'Worker %s Finished' % nameStr
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  nameStr = 'Worker_'+str(i)
  p = multiprocessing.Process(target=worker, args=(q,nameStr))
  p.daemon = True
  p.start()
  procs.append(p)

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

for i in range(num_procs):
  q.put(None) # send termination sentinel, one for each process

while not q.empty(): # wait for processing to finish
  sleep(1)   # manage timeouts and status updates etc.
Hugh
  • 2,519
  • 2
  • 17
  • 11
Paul Smith
  • 69
  • 1
  • 2
  • 2
    while not q.empty(), is not a reliable way to know processing is finished, only when a worker grabs the last piece of work to be done. Frankly, with how you're improperly using the JoinableQueue, you don't need a JoinableQueue. If you chose not to use a one, you wouldn't need the worker threads to flag task_done. The purpose of using such a queue is so you can join it, which is exactly what you want to do at the end of this program instead of waiting for the queue to be empty. – leetNightshade Nov 08 '12 at 22:42
  • Yes, with this method, the job finishes prematurely. – Forethinker Dec 11 '13 at 19:26
5

Here is a sentinel-free method for the relatively simple case where you put a number of tasks on a JoinableQueue, then launch worker processes that consume the tasks and exit once they read the queue "dry". The trick is to use JoinableQueue.get_nowait() instead of get(). get_nowait(), as the name implies, tries to get a value from the queue in a non-blocking manner and if there's nothing to be gotten then a queue.Empty exception is raised. The worker handles this exception by exiting.

Rudimentary code to illustrate the principle:

import multiprocessing as mp
from queue import Empty

def worker(q):
  while True:
    try:
      work = q.get_nowait()
      # ... do something with `work`
      q.task_done()
    except Empty:
      break # completely done

# main
worknum = 4
jq = mp.JoinableQueue()

# fill up the task queue
# let's assume `tasks` contains some sort of data
# that your workers know how to process
for task in tasks:
  jq.put(task)

procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ]
for p in procs:
  p.start()

for p in procs:
  p.join()

The advantage is that you do not need to put the "poison pills" on the queue so the code is a bit shorter.

IMPORTANT : in more complex situations where producers and consumers use the same queue in an "interleaved" manner and the workers may have to wait for new tasks to come along, the "poison pill" approach should be used. My suggestion above is for simple cases where the workers "know" that if the task queue is empty, then there's no point hanging around any more.

András Aszódi
  • 8,948
  • 5
  • 48
  • 51
3

You have to clear the queue before joining the process, but q.empty() is unreliable.

The best way to clear the queue is to count the number of successful gets or loop until you receive a sentinel value, just like a socket with a reliable network.

Cees Timmerman
  • 17,623
  • 11
  • 91
  • 124
1

The code below may not be very relevant but I post it for your comments/feedbacks so we can learn together. Thank you!

import multiprocessing

def boss(q,nameStr):
  source = range(1024)
  for item in source:
    q.put(nameStr+' '+str(item))
  q.put(None) # send termination sentinel, one for each process

def worker(q,nameStr):
  while True:
     item = q.get()
     if item is None: # detect sentinel
       break
     print '%s processed %s' % (nameStr,item) # do something useful

q = multiprocessing.Queue()

procs = []

num_procs = 4
for i in range(num_procs):
  nameStr = 'ID_'+str(i)
  p = multiprocessing.Process(target=worker, args=(q,nameStr))
  procs.append(p)
  p = multiprocessing.Process(target=boss,   args=(q,nameStr))
  procs.append(p)

for j in procs:
  j.start()
for j in procs:
  j.join()
Fan
  • 97
  • 1
  • 3