4

I realize I could use the Pool class and probably get what I needed, but I want a little finer control over my problem. I have more jobs than I do processors, so I don't want them to run all at one time.

For instance:

from multiprocessing import Process,cpu_count
for dir_name in directories:
    src_dir = os.path.join(top_level,dir_name)
    dst_dir = src_dir.replace(args.src_dir,args.target_dir)
    p = Process(target=transfer_directory, args=(src_dir, dst_dir,))
    p.start()

However, if I have more than 16 directories, I then will start more jobs than I have processors. Here was my solution that is really hack.

from multiprocessing import Process,cpu_count
jobs = []
for dir_name in directories:
    src_dir = os.path.join(top_level,dir_name)
    dst_dir = src_dir.replace(args.src_dir,args.target_dir)
    p = Process(target=transfer_directory, args=(src_dir, dst_dir,))
    jobs.append(p)

alive_jobs = []
while jobs:
    if len(alive_jobs) >= cpu_count():
        time.sleep(5)
        print alive_jobs
        for aj in alive_jobs:
            if aj.is_alive():
                continue
            else:
                print "job {} removed".format(aj)
                alive_jobs.remove(aj)

        continue

    for job in jobs:
        if job.is_alive():
            continue
        job.start()
        alive_jobs.append(job)
        print alive_jobs
        jobs.remove(job)
        if len(alive_jobs) >= cpu_count():
            break

Is there a better solution using the built in tools?

jwillis0720
  • 4,329
  • 8
  • 41
  • 74
  • What finer control does this give you? – Peter Wood Apr 20 '15 at 21:00
  • Well if anyone has a solution to being able to keyboard interrupt the Pool module without it freezing and having to shut down the terminal – jwillis0720 Apr 20 '15 at 21:02
  • 3
    Related: [Keyboard Interrupts with python's multiprocessing Pool](http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool). – Brendan Long Apr 20 '15 at 21:06
  • Also: http://stackoverflow.com/questions/14579474/multiprocessing-pool-spawning-new-childern-after-terminate-on-linux-python2-7 – Brendan Long Apr 20 '15 at 21:10

1 Answers1

5

I wanna share my idea here: create number of processes equals to cpu_count(), use a Queue stores all your directories,and pass the Queue into your transfer_directory method, takes a dir_name out from the Queue once a process finishes its work. A draft looks like this:

NUM_OF_PROCESSES = multiprocessing.cpu_count()
TIME_OUT_IN_SECONDS = 60

for dir_name in directories:
    my_queue.put(dir_name)

# creates processes that equals to number of CPU 
processes = [multiprocessing.Process(target=transfer_directory, args=(my_queue,)) for x in range(NUM_OF_PROCESSES)]

# starts processes
for p in processes:
    p.start()

# blocks the calling thread
for p in processes:
    p.join()



def transfer_directory(my_queue):
    """processes element of directory queue if queue is not empty"""
    while my_queue is not empty:
        dir_name = my_queue.get(timeout=TIME_OUT_IN_SECONDS)
        src_dir = os.path.join(top_level,dir_name)
        dst_dir = src_dir.replace(args.src_dir,args.target_dir)

Edit: It also works efficient for reading a large file. I was struggling how to read a huge file(it was more than 10 million lines) using multiprocessing for a while, and finally I use a single process starts producer(a_queue) that just reads and puts lines into the queue, and then start multiple consumers(a_queue) to take lines from a_queue and do time-consuming work, and it works properly for me.

Haifeng Zhang
  • 30,077
  • 19
  • 81
  • 125