1

I am following the principles laid down in this post to safely output the results which will eventually be written to a file. Unfortunately, the code only print 1 and 2, and not 3 to 6.

import os
import argparse
import pandas as pd
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep


def feed(queue, parlist):

    for par in parlist:
            queue.put(par)
    print("Queue size", queue.qsize())

def calc(queueIn, queueOut):

    while True:
        try:
            par=queueIn.get(block=False)
            res=doCalculation(par)
            queueOut.put((res))
            queueIn.task_done()
        except:
            break

def doCalculation(par):

    return par

def write(queue):
    while True:
        try:
            par=queue.get(block=False)
            print("response:",par)
        except:
            break


if __name__ == "__main__":

    nthreads = 2
    workerQueue = Queue()
    writerQueue = Queue()

    considerperiod=[1,2,3,4,5,6]

    feedProc = Process(target=feed, args=(workerQueue, considerperiod))
    calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target=write, args=(writerQueue,))

    feedProc.start()
    feedProc.join()
    for p in calcProc:
        p.start()

    for p in calcProc:
        p.join()
    writProc.start()
    writProc.join()

On running the code it prints,

$ python3 tst.py
Queue size 6
response: 1
response: 2

Also, is it possible to ensure that the write function always outputs 1,2,3,4,5,6 i.e. in the same order in which the data is fed into the feed queue?

hansaplast
  • 11,007
  • 2
  • 61
  • 75
trumee
  • 393
  • 1
  • 4
  • 11

1 Answers1

0

The error is somehow with the task_done() call. If you remove that one, then it works, don't ask me why (IMO that's a bug). But the way it works then is that the queueIn.get(block=False) call throws an exception because the queue is empty. This might be just enough for your use case, a better way though would be to use sentinels (as suggested in the multiprocessing docs, see last example). Here's a little rewrite so your program uses sentinels:

import os
import argparse
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep

def feed(queue, parlist, nthreads):
    for par in parlist:
        queue.put(par)
    for i in range(nthreads):
        queue.put(None)
    print("Queue size", queue.qsize())

def calc(queueIn, queueOut):
    while True:
        par=queueIn.get()
        if par is None:
            break
        res=doCalculation(par)
        queueOut.put((res))

def doCalculation(par):
    return par

def write(queue):
    while not queue.empty():
        par=queue.get()
        print("response:",par)


if __name__ == "__main__":
    nthreads = 2
    workerQueue = Queue()
    writerQueue = Queue()

    considerperiod=[1,2,3,4,5,6]

    feedProc = Process(target=feed, args=(workerQueue, considerperiod, nthreads))
    calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target=write, args=(writerQueue,))

    feedProc.start()
    feedProc.join()
    for p in calcProc:
        p.start()

    for p in calcProc:
        p.join()
    writProc.start()
    writProc.join()

A few things to note:

  • the sentinel is putting a None into the queue. Note that you need one sentinel for every worker process.
  • for the write function you don't need to do the sentinel handling as there's only one process and you don't need to handle concurrency (if you would do the empty() and then get() thingie in your calc function you would run into a problem if e.g. there's only one item left in the queue and both workers check empty() at the same time and then both want to do get() and then one of them is locked forever)
  • you don't need to put feed and write into processes, just put them into your main function as you don't want to run it in parallel anyway.

how can I have the same order in output as in input? [...] I guess multiprocessing.map can do this

Yes map keeps the order. Rewriting your program into something simpler (as you don't need the workerQueue and writerQueue and adding random sleeps to prove that the output is still in order:

from multiprocessing import Pool
import time
import random    

def calc(val):
    time.sleep(random.random())
    return val

if __name__ == "__main__":
    considerperiod=[1,2,3,4,5,6]
    with Pool(processes=2) as pool:
        print(pool.map(calc, considerperiod))
hansaplast
  • 11,007
  • 2
  • 61
  • 75
  • Thanks for the response. This is very helpful. As mentioned in the bottom of the OP how can i get the same order of output as input. I inserted `sleep(randint(1, 5))` in doCalculation function just to introduce some complexity. The outcome was a different order that the initial feed. – trumee Dec 25 '17 at 20:10
  • @trumee in a multiprocessing environment you can never say in which order it is processed. In fact that's exactly the point that one process who is faster done with his process is taking up the next item already. Why would you want to have it ordered? – hansaplast Dec 25 '17 at 22:30
  • The output of each of the thread is to be written to a file, hence the order is important. I guess the Pool.map function can do this. – trumee Dec 26 '17 at 23:59
  • @trumee: I have now added a section with more explanation why parallelism and order don't go together and also how you could solve that for your usecase – hansaplast Dec 27 '17 at 08:26
  • @trumee upon further reading I found out that map indeed keeps the order, I've revised my answer now – hansaplast Dec 29 '17 at 12:24