27

I want limit resource access in children processes. For example - limit http downloads, disk io, etc.. How can I achieve it expanding this basic code?

Please share some basic code examples.

pool = multiprocessing.Pool(multiprocessing.cpu_count())
while job_queue.is_jobs_for_processing():
  for job in job_queue.pull_jobs_for_processing:
    pool.apply_async(do_job, callback = callback)
pool.close()
pool.join()
Chameleon
  • 9,722
  • 16
  • 65
  • 127
  • The way you want to limit resource access is with a `Lock` or `Semaphore`? Any reason not to just use `multiprocessing.Lock` / `multiprocessing.Semaphore`? – dano Feb 23 '15 at 00:19
  • @dano How to pass multiprocessing.Lock() or Semaphore() to pool? What are options to share globally lock? – Chameleon Feb 23 '15 at 22:08
  • The need for limiting resource access does not imply any synchronisation need for a process pool, but rather the worker tasks. Why don't you explain exactly what you want to accomplish? – Michael Foukarakis Feb 25 '15 at 12:31
  • @MichaelFoukarakis It is not important **why?** but important is **how?**. I can answer you **why?** since random io is slower than sequential io - Did I answered you question? See statistics - http://goo.gl/TbC2xp. Memcache works different than disk and hard disk than flash memory (it is often named disk but it is not disk) or www server - some need semaphore some not need - whatever I need learn global semaphore pattern for multiprocessing in Python as many other people. – Chameleon Feb 25 '15 at 12:58
  • @MichaelFoukarakis WWW server need semaphore to be polite and not deny site with parallel enormous number of requests - it is not limited by design but by Internet ethics. – Chameleon Feb 25 '15 at 13:03

2 Answers2

35

Use the initializer and initargs arguments when creating a pool so as to define a global in all the child processes.

For instance:

from multiprocessing import Pool, Lock
from time import sleep

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def init_child(lock_):
    global lock
    lock = lock_

def main():
    lock = Lock()
    poolsize = 4
    with Pool(poolsize, initializer=init_child, initargs=(lock,)) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()

This code will print out the numbers 0-3 in ascending order (the order in which the jobs were submitted), because it uses the lock. Comment out the with lock: line to see it print out the numbers in descending order.

This solution works both on windows and unix. However, because processes can fork on unix systems, unix only need to declare global variables at the module scope. The child process gets a copy of the parent's memory, which includes the lock object which still works. Thus the initializer isn't strictly needed, but it can help document how the code is intended to work. When multiprocessing is able to create processes by forking, then the following also works.

from multiprocessing import Pool, Lock
from time import sleep

lock = Lock()

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def main():
    poolsize = 4
    with Pool(poolsize) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()
Dunes
  • 37,291
  • 7
  • 81
  • 97
  • I was study second example but it looks that `lock = Lock()` is not global since not passed by master - am I wrong? – Chameleon Mar 01 '15 at 20:16
  • If first example can be the same problem - I will test it - code looks nice but I think that child process will not know about parent nothing under Windows. – Chameleon Mar 01 '15 at 20:19
  • In the second example, when a child process is created (on unix) by the pool, the entire parent process's memory is copied to the child process (which includes the lock object). Since you are using windows, do not use the second example. – Dunes Mar 01 '15 at 20:21
8

Use a global semaphore and aquire it if you are accessing a resource. For example:

import multiprocessing
from time import sleep

semaphore = multiprocessing.Semaphore(2)

def do_job(id):
    with semaphore:
        sleep(1)
    print("Finished job")

def main():
    pool = multiprocessing.Pool(6)
    for job_id in range(6):
        print("Starting job")
        pool.apply_async(do_job, [job_id])
    pool.close()
    pool.join()

if __name__ == "__main__":
    main()

This program finishes only two jobs every second because the other threads are waiting for the semaphore.

MarcelSimon
  • 572
  • 1
  • 4
  • 6