1

I have a script that collects data from a database, filters and puts into list for further processing. I've split entries in the database between several processes to make the filtering faster. Here's the snippet:

def get_entry(pN,q,entries_indicies):

    ##collecting and filtering data
    q.put((address,page_text,))
    print("Process %d finished!" % pN)

def main():

    #getting entries
    data = []

    procs = []
    for i in range(MAX_PROCESSES):
        q = Queue()
        p = Process(target=get_entry,args=(i,q,entries_indicies[i::MAX_PROCESSES],))
        procs += [(p,q,)]
        p.start()       
    for i in procs:
        i[0].join()
        while not i[1].empty():
            #process returns a tuple (address,full data,)
            data += [i[1].get()]
    print("Finished processing database!")

    #More tasks
    #................

I've run it on Linux (Ubuntu 14.04) and it went totally fine. The problems start when I run it on Windows 7. The script gets stuck on i[0].join() for 11th process out of 16 (which looks totally random to me). No error messages, nothing, just freezes there. At the same time, the print("Process %d finished!" % pN) is displayed for all processes, which means they all come to an end, so there should be no problems with the code of get_entry

I tried to comment the q.put line in the process function, and it all went through fine (well, of course, data ended up empty).

Does it mean that Queue here is to blame? Why does it make join() stuck? Is it because of internal Lock within Queue? And if so, and if Queue renders my script unusable on Windows, is there some other way to pass data collected by processes to data list in the main process?

Highstaker
  • 1,015
  • 2
  • 12
  • 28

2 Answers2

0

Came up with an answer to my last question.

I use Manager instead

def get_entry(pN,q,entries_indicies):
    #processing

    # assignment to manager list in another process doesn't work, but appending does.
    q += result

def main():

    #blahbalh

    #getting entries
    data = []

    procs = []
    for i in range(MAX_PROCESSES):
        manager = Manager()
        q = manager.list()
        p = Process(target=get_entry,args=(i,q,entries_indicies[i::MAX_PROCESSES],))
        procs += [(p,q,)]
        p.start()
    # input("Press enter when all processes finish")
    for i in procs:
        i[0].join()
        data += i[1]
    print("data", data)#debug
    print("Finished processing database!")

    #more stuff

The nature of freezing in Windows on join() due to presence of Queue still remains a mystery. So the question is still open.

Highstaker
  • 1,015
  • 2
  • 12
  • 28
0

As the docs says,

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

So, since the multiprocessing.Queue is a kind of Pipe, when you call .join(), there are some items in the queue, and you should consume then or simply .get() them to make the empty. Then call .close() and .join_thread() for each queue.

You can also refer to this answer.

GoingMyWay
  • 16,802
  • 32
  • 96
  • 149