1

Sorry for the long code, I have tried to make it as simple as possible and yet reproducible.

In short, this python script starts four processes that randomly distribute numbers into lists. Then, the result is added to a multiprocessing.Queue().

import random
import multiprocessing
import numpy
import sys

def work(subarray, queue):
    result = [numpy.array([], dtype=numpy.uint64) for i in range (0, 4)]

    for element in numpy.nditer(subarray):
        index = random.randint(0, 3)
        result[index] = numpy.append(result[index], element)

    queue.put(result)
    print "after the queue.put"

jobs = []
queue = multiprocessing.Queue()

subarray = numpy.array_split(numpy.arange(1, 10001, dtype=numpy.uint64), 4)

for i in range(0, 4):
    process = multiprocessing.Process(target=work, args=(subarray[i], queue))
    jobs.append(process)
    process.start()

for j in jobs:
    j.join()

print "the end"

All processes ran the print "after the queue.put" line. However, it doesn't get to the print "the end" line. Weird enough, if I change the arange from 10001 to 1001, it gets to the end. What is happening?

Frederico Schardong
  • 1,946
  • 6
  • 38
  • 62
  • I can recreate this behaviour on python3.6. I also have no idea what is going on. This behaviour is very weird. A simple workaround would be to just collect the results from the queue at the end instead of join. – Jannick Aug 14 '18 at 20:30
  • @Jannick I didn't get the "at the end instead of join". Can you please describe that part? – Frederico Schardong Aug 14 '18 at 20:33

3 Answers3

1

I will expand my comment into a short answer. As I also do not understand the weird behavior it is merely a workaround.

A first observation is that the code runs to the end if the queue.put line is commented out, so it must be a problem related to the queue. The results are actually added to the queue so the problem must be in the interplay between the queue and join.

The following code works as expected

import random
import multiprocessing
import numpy
import sys
import time

def work(subarray, queue):
    result = [numpy.array([], dtype=numpy.uint64) for i in range (4)]

    for element in numpy.nditer(subarray):
        index = random.randint(0, 3)
        result[index] = numpy.append(result[index], element)

    queue.put(result)
    print("after the queue.put")


jobs = []
queue = multiprocessing.Queue()

subarray = numpy.array_split(numpy.arange(1, 15001, dtype=numpy.uint64), 4)


for i in range(4):
    process = multiprocessing.Process(target=work, args=(subarray[i], queue))
    jobs.append(process)
    process.start()

res = []
while len(res)<4:
    res.append(queue.get())

print("the end")
Jannick
  • 1,036
  • 7
  • 16
1

most of the child processes are blocking on put call. multiprocessing queue put

block if necessary until a free slot is available.

this can be avoided by adding a call to queue.get() before join.

Also, in multiprocessing code please isolate the parent process by having:

if __name__ == '__main__':
    # main code here

Compulsory usage of if name==“main” in windows while using multiprocessing

hmad
  • 159
  • 8
  • I think put will only block if a maximum size in the queue has been set, which is not the case here. – Jannick Aug 14 '18 at 22:46
1

This is the reason:

Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

Community
  • 1
  • 1
hmad
  • 159
  • 8