7

I want to run parallel computation on some input data which is loaded from a file. (The file can be really big, so I use a generator for this.)

On a certain number of items, my code runs OK but above this threshold the program hangs (some of the worker processes do not end).

Any suggestions? (I am running this with python2.7, 8 CPUs; 5,000 lines still OK, 7,500 does not work.)

Firstly, you need an input file. Generate it in bash:

for i in {0..10000}; do echo -e "$i"'\r' >> counter.txt; done

Then, run this:

python2.7 main.py 100 counter.txt > run_log.txt

main.py:

#!/usr/bin/python2.7
import os, sys, signal, time
import Queue
import multiprocessing as mp

def eat_queue(job_queue, result_queue):
    """Eats input queue, feeds output queue
    """
    proc_name = mp.current_process().name
    while True:
        try:
            job = job_queue.get(block=False)
            if job == None:
                print(proc_name + " DONE")
                return
            result_queue.put(execute(job))
        except Queue.Empty:
            pass    

def execute(x):
    """Does the computation on the input data
    """
    return x*x

def save_result(result):
    """Saves results in a list
    """
    result_list.append(result)

def load(ifilename):
    """Generator reading the input file and
        yielding it row by row
    """
    ifile = open(ifilename, "r")
    for line in ifile:
        line = line.strip()
        num = int(line)
        yield (num)
    ifile.close()
    print("file closed".upper())

def put_tasks(job_queue, ifilename):
    """Feeds the job queue
    """
    for item in load(ifilename):
        job_queue.put(item)
    for _ in range(get_max_workers()):
        job_queue.put(None)

def get_max_workers():
    """Returns optimal number of processes to run
    """
    max_workers = mp.cpu_count() - 2
    if max_workers < 1:
        return 1
    return max_workers

def run(workers_num, ifilename):
    job_queue = mp.Queue()
    result_queue = mp.Queue()

    # decide how many processes are to be created
    max_workers = get_max_workers()
    print "processes available: %d" % max_workers
    if workers_num < 1 or workers_num > max_workers:
        workers_num = max_workers

    workers_list = []
    # a process for feeding job queue with the input file
    task_gen = mp.Process(target=put_tasks, name="task_gen",
                          args=(job_queue, ifilename))
    workers_list.append(task_gen)

    for i in range(workers_num):
        tmp = mp.Process(target=eat_queue, name="w%d" % (i+1),
                                      args=(job_queue, result_queue))
        workers_list.append(tmp)

    for worker in workers_list:
        worker.start()

    for worker in workers_list:
        worker.join()
        print "worker %s finished!" % worker.name

if __name__ == '__main__':
    result_list = []
    args = sys.argv
    workers_num = int(args[1])
    ifilename = args[2]
    run(workers_num, ifilename)
galapah
  • 379
  • 1
  • 2
  • 14

1 Answers1

8

This is because nothing in your code takes anything off result_queue. The behavior then depends on internal queue buffering details: if "not a lot" of data is waiting, everything appears fine, but if "a lot" of data is waiting, everything freezes. Not much more can be said, because it involves layers of internal magic ;-) But the docs do warn about it:

Warning

As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

One easy way to repair that: First add

            result_queue.put(None)

before eat_queue() returns. Then add:

count = 0
while count < workers_num:
    if result_queue.get() is None:
        count += 1

before the main program .join()s the workers. That drains the result queue, and everything shuts down cleanly then.

BTW, this code is pretty bizarre:

while True:
    try:
        job = job_queue.get(block=False)
        if job == None:
            print(proc_name + " DONE")
            return
        result_queue.put(execute(job))
    except Queue.Empty:
        pass

Why are you doing non-blocking get()? This turns into a CPU-hog "busy loop" so long as the queue is empty. The primary point of .get() is to supply an efficient way to wait for work to show up. So:

while True:
    job = job_queue.get()
    if job is None:
        print(proc_name + " DONE")
        break
    else:
        result_queue.put(execute(job))
result_queue.put(None)

does the same thing, but far more efficiently.

Queue size caution

You didn't ask about this, but let's cover it before it bites you ;-) By default, there is no bound on a Queue's size. If, e.g., you add a billion items to the Queue, it will demand enough RAM to hold a billion items. So if your producer(s) can generate work items faster than your consumer(s) can process them, memory use can get out of hand quickly.

Fortunately, that's easy to repair: specify a maximum queue size. For example,

    job_queue = mp.Queue(maxsize=10*workers_num)
                         ^^^^^^^^^^^^^^^^^^^^^^^

Then job_queue.put(some_work_item) will block until consumers reduce the size of the queue to less than the maximum. This way you can process enormous problems with a queue that requires trivial RAM.

Community
  • 1
  • 1
Tim Peters
  • 67,464
  • 13
  • 126
  • 132
  • Thanks! Now, the program runs correctly. Thanks for all the suggestions. (I had actually read that part of the documentation while working on input and did not realize this in case of output queue.) Why I used `blocks=false`: I just compiled the scheme of the code from various code snippets. But I am not sure what it actually does - I am confused (even by docs in this case): if it is `false`, the child which is to read the input, does not wait for the next item coming? (If so what happens?) – galapah Dec 14 '13 at 00:12
  • 1
    `block=False` means the call returns immediately, regardless of whether anything was on the queue. In general, ignore "block" and "timeout" arguments. All this stuff is perfectly usable without them for the vast majority of tasks, and using them *can* get you into various kinds of trouble. It takes a lot of experience to know when you *need* a non-blocking or timed-out call - and even then most people don't ;-) – Tim Peters Dec 14 '13 at 01:13