1

Suppose I have the following multiprocessing structure:

import multiprocessing as mp
def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() == True:
            break 
        else:
            picked = working_queue.get()
            res_item = "Number " + str(picked)
            output_queue.put(res_item)
    return

if __name__ == '__main__':
    static_input = xrange(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    results_bank = []
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(2)]
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    results_bank = []
    while True:
       if output_q.empty() == True:
           break
       results_bank.append(output_q.get_nowait())
    if len(results_bank) == len(static_input):
        print "Good run"
    else:
        print "Bad run"

My question: How would I 'batch' write my results to a single file while the working_queue is still 'working' (or at least, not finished)?

Note: My actual data structure is not sensitive to unordered results relative to inputs (despite my example using integers).

Also, I think that batch/set writing from the output queue is best practice rather than from the growing results bank object. However, I am open to solutions relying on either approach. I am new to multiprocessing so unsure of best practice or most efficient solution(s) to this question.

martineau
  • 119,623
  • 25
  • 170
  • 301
DV Hughes
  • 305
  • 2
  • 5
  • 22
  • @martineau thanks for clarifying 'batch'. I was about to do the same myself. – DV Hughes Mar 28 '17 at 16:31
  • Seems like you could have a nested loop in `worker()` that kept going until the `working_queue` was emptied. Is that what you meant by "batch write"? – martineau Mar 28 '17 at 16:34
  • No, my data need is more "as results accumulate, write out." As in, say, write out the results in 'sets' of 5. From my understanding of your suggestion, that would write out results at the end of the working_queue altogether, which is basically equivalent to my 'growing object' (results bank) in the example above. Or did you mean to suggest that as the working_queue 'clears' or 'flushes' I can write out the results in sets? – DV Hughes Mar 28 '17 at 16:37
  • I meant the latter (flushes it by getting everything in it at that time). When you've accumulated a batch's worth, you can write them out. I think that's basically what @Messa is suggesting in his answer. – martineau Mar 28 '17 at 16:38
  • Oh, alright. As I said, I am new to multiprocessing so a bit unsure how the queues work just yet. I knew enough to put results in a dedicated queue but did not know if processes would 'jam' or race over one another if I tried to write from the output queue as it was still filling. – DV Hughes Mar 28 '17 at 16:42

2 Answers2

1

If you wish to use mp.Processes and mp.Queues, here is a way to process the results in batches. The main idea is in the writer function, below:

import itertools as IT
import multiprocessing as mp
SENTINEL = None
static_len = 100

def worker(working_queue, output_queue):
    for picked in iter(working_queue.get, SENTINEL):
        res_item = "Number {:2d}".format(picked)
        output_queue.put(res_item)

def writer(output_queue, threshold=10):
    result_length = 0
    items = iter(output_queue.get, SENTINEL)
    for batch in iter(lambda: list(IT.islice(items, threshold)), []):
        print('\n'.join(batch))
        result_length += len(batch)
    state = 'Good run' if result_length == static_len else 'Bad run'
    print(state)

if __name__ == '__main__':
    num_workers = 2

    static_input = range(static_len)
    working_q = mp.Queue()
    output_q = mp.Queue()

    writer_proc = mp.Process(target=writer, args=(output_q,))
    writer_proc.start()

    for i in static_input:
        working_q.put(i)

    processes = [mp.Process(target=worker, args=(working_q, output_q)) 
                 for i in range(num_workers)]
    for proc in processes:
        proc.start()
        # Put SENTINELs in the Queue to tell the workers to exit their for-loop
        working_q.put(SENTINEL)
    for proc in processes:
        proc.join()

    output_q.put(SENTINEL)
    writer_proc.join()

When passed two arguments, iter expects a callable and a sentinel: iter(callable, sentinel). The callable (i.e. a function) gets called repeatedly until it returns a value equal to the sentinel. So

items = iter(output_queue.get, SENTINEL)

defines items to be an iterable which, when iterated over, will return items from output_queue until output_queue.get() returns SENTINEL.

The for-loop:

for batch in iter(lambda: list(IT.islice(items, threshold)), []):

calls the lambda function repeatedly until an empty list is returned. When called, the lambda function returns a list of up to threshold number of items from the iterable items. Thus, this is an idiom for "grouping by n items without padding". See this post for more on this idiom.


Note that it is not a good practice to test working_q.empty(). It could lead to a race condition. For example, suppose we have the 2 worker processes on these lines when the working_q has only 1 item left in it:

def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() == True:        <-- Process-1
            break 
        else:
            picked = working_queue.get()         <-- Process-2
            res_item = "Number " + str(picked)
            output_queue.put(res_item)
    return

Suppose Process-1 calls working_queue.empty() while there is still one item in the queue. So it returns False. Then Process-2 calls working_queue.get() and obtains the last item. Then Process-1 gets to line picked = working_queue.get() and hangs because there are no more items in the queue.

Therefore, use sentinels (as shown above) to concretely signal when a for-loop or while-loop should stop instead of checking queue.empty().

Community
  • 1
  • 1
unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • Could you please explain a bit further your code starting at 'for res_item in iter(output_queue.get, SENTINEL):' In particular, I am not sure why (or how) your 'if len(batch)>=threshold' statement is 'duplicated' in 'if len(batch)' ... It appears you're extending the results object regardless of whether the threshold has been reached or not? It's all good and well to print regardless of threshold, batch, etc. but it is crucial to understand when 'batch' length equals threshold so as to replace your 'print' statements with a write-to-file. – DV Hughes Mar 28 '17 at 18:05
  • @DVHughes: I've added some words of explanation above. Let me know if there is anything unclear. – unutbu Mar 28 '17 at 18:43
  • Alright! Thanks, I was thinking it was the 'residual' batch as you mentioned. Just wanted to be sure before adding any write statements. if len(batch) made it seem like you were testing that the batch was not empty.. but indeed you are testing if there's anything residual/left over. Same difference but it's good to be conceptually clear, especially for others over time as they happen across this question/thread. Thanks! This was really easy to understand and coherent with my queue-based approach to multiprocess in my example code. – DV Hughes Mar 28 '17 at 18:47
  • 1
    @DVHughes: I've changed the code above so that we iterate over batches instead of iterating over individual items. This avoids the issue of a residual batch and (big bonus) means [you don't have to repeat the code](https://en.wikipedia.org/wiki/Don%27t_repeat_yourself) that prints to the screen or writes to a file twice. – unutbu Mar 28 '17 at 19:07
  • Thanks, this was really clean logic and conceptually clear from the code. I think you improved your answer. The only other resource I was able to find on 'batch get' was something to this effect: http://stackoverflow.com/questions/41498614/multiprocessing-queue-batch-get-up-to-max-n-elements – DV Hughes Mar 28 '17 at 19:38
0

There is no operation like "batch q.get". But it is a good practice to put/pop a batch of items instead of items one by one.

Which is exactly what multiprocessing.Pool.map is doing with its parameter chunksize :)

For writing output as soon as possible there is Pool.imap_unordered which returns an iterable instead of list.

def work(item):
    return "Number " + str(item)

import multiprocessing
static_input = range(100)
chunksize = 10
with multiprocessing.Pool() as pool:
    for out in pool.imap_unordered(work, static_input, chunksize):
        print(out)
Messa
  • 24,321
  • 6
  • 68
  • 92