7

The following code starts three processes, they are in a pool to handle 20 worker calls:

import multiprocessing

def worker(nr):
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

Is there a way to start the processes in a sequence (as opposed to having them starting all at the same time), with a delay inserted between each process start?

If not using a Pool I would have used multiprocessing.Process(target=worker, args=(nr,)).start() in a loop, starting them one after the other and inserting the delay as needed. I find Pool to be extremely useful, though (together with the map call) so I would be glad to keep it if possible.

Velimir Mlaker
  • 10,664
  • 4
  • 46
  • 58
WoJ
  • 27,165
  • 48
  • 180
  • 345
  • 2
    why do you want to do this? – acushner Sep 15 '15 at 15:04
  • @acushner: an an example, I have an API which I need to call 200 times. The constraints of the API are that each call lasts about 5 minutes (this changes between calls). I can have 10 calls running simultaneous and each call must be started with at least a 5 seconds delay after the previous call has ended. I can just add a 5 seconds sleep at the beginning of my worker - this works fine along the way **except** for the very start when the 10 parallel calls are launched at the same time. This is why creating the `Pool` in a sequence (with the 5 seconds delay) would solve the issue. – WoJ Sep 16 '15 at 08:08
  • Cannot edit my previous comment, the second sentence should be: *The constraints of the API are that each call lasts about 5 minutes (this changes between calls) **,** I can have 10 calls (...)*. Note the comma instead of a full stop (there are three constraints) – WoJ Sep 16 '15 at 15:17
  • are you using python 2 or python 3? – acushner Sep 16 '15 at 15:47
  • @acushner: python 3.4.3 – WoJ Sep 16 '15 at 15:48

4 Answers4

5

According to the documentation, no such control over pooled processes exists. You could however, simulate it with a lock:

import multiprocessing
import time

lock = multiprocessing.Lock()

def worker(nr):
    lock.acquire()
    time.sleep(0.100)
    lock.release()
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

Your 3 processes will still start simultaneously. Well, what I mean is you don't have control over which process starts executing the callback first. But at least you get your delay. This effectively has each worker "starting" (but really, continuing) at designated intervals.

Ammendment resulting from discussion below:

Note that on Windows it's not possible to inherit a lock from a parent process. Instead, you can use multiprocessing.Manager().Lock() to communicate a global lock object between processes (with additional IPC overhead, of course). The global lock object needs to be initialized in each process, as well. This would look like:

from multiprocessing import Process, freeze_support
import multiprocessing
import time
from datetime import datetime as dt

def worker(nr):
    glock.acquire()
    print('started job: {} at {}'.format(nr, dt.now()))
    time.sleep(1)
    glock.release()
    print('ended   job: {} at {}'.format(nr, dt.now()))

numbers = [i for i in range(6)]

def init(lock):
    global glock
    glock = lock

if __name__ == '__main__':
    multiprocessing.freeze_support()
    lock = multiprocessing.Manager().Lock()
    pool = multiprocessing.Pool(processes=3, initializer=init, initargs=(lock,))
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()
Velimir Mlaker
  • 10,664
  • 4
  • 46
  • 58
  • I tried your code and added some timing (http://pastebin.com/aFfQgBSD). The results are the same whether the `lock.acquire()` and `lock.release()` are present or commented out (output with the lock present: http://pastebin.com/3gu70D6q and with the two lines commented out: http://pastebin.com/MgB6D8E5) – WoJ Sep 16 '15 at 10:36
  • My code works as intended. You need to change your pastebin example in order to observe the correct behavior: your workers need to acquire the lock **before** printing the first message. Otherwise, all processes get to print their timestamp before any one of them acquires the lock. Then finally, one of them acquires it, and so on... What you really want to do is http://pastebin.com/a2WR0C3N. – Velimir Mlaker Sep 16 '15 at 15:04
  • Well, the three first jobs still print 'ended...' at the same time in your pastebin code, which means that they have not done that line sequentially. They have started at the same time [some magic with locking and waiting] and printed the 'ended ... ' all together. What I would like is to have the 'ending ..;' part of the workers be done one after the other. Also please see my example following the comment right after the question - for a real-life application. – WoJ Sep 16 '15 at 15:15
  • It works just fine, here's the output of my code: http://pastebin.com/rV7w731i . Can you pastebin your output? – Velimir Mlaker Sep 16 '15 at 15:21
  • Thanks for keeping up :) I increased the sleep to 5 s to make the shifts more visible (http://pastebin.com/9QvkZ8fQ). As you can see 3 workers start at 8, all of them end at 13, new start at 13 and end at 18. This means that they are not sequential, separated by a 5 seconds delay. – WoJ Sep 16 '15 at 15:25
  • Let's try to get to the bottom of this :). How does this work for you http://pastebin.com/kiWpADQG ? Now we're using a pool initalizer to set a global lock object created via `multiprocessing.Manager` (similar to this answer: http://stackoverflow.com/a/8277123/1510289). An explanation of relevant Windows vs. Linux differeces can be found at http://stackoverflow.com/a/24787346/1510289 . – Velimir Mlaker Sep 16 '15 at 15:41
  • aaannd .. this one worked - THANKS. Could you please move it to your answer? I will remove my comments to keep a clean (accepted of course) answer. Thanks again. – WoJ Sep 16 '15 at 15:45
  • Great, glad that worked for you! Initially I wasn't aware you're on a Windows box. – Velimir Mlaker Sep 16 '15 at 17:15
  • Please dont use lock.acquire/release. Use "with lock" . In your code, if an exception happens after the acquire, you failed the job interview. – user48956 Sep 27 '17 at 05:41
  • Similarly, close and join should be a in a finally: or you should use "with Pool() as pool:" – user48956 Sep 27 '17 at 05:43
0

Couldn't you do something simple like this:

from multiprocessing import Process
from time import sleep

def f(n):
    print 'started job: '+str(n)
    sleep(3)
    print 'ended job: '+str(n)

if __name__ == '__main__':
    for i in range(0,100):
        p = Process(target=f, args=(i,))
        p.start()
        sleep(1)

Result

started job: 0
started job: 1
started job: 2
ended job: 0
started job: 3
ended job: 1
started job: 4
ended job: 2
started job: 5
taesu
  • 4,482
  • 4
  • 23
  • 41
  • This is the solution I mentioned in the last part of my question. I wanted to understand if it was possible to have a control over the way `Pool` processes start. – WoJ Sep 15 '15 at 07:26
  • Beside that the code above would start 100 processes in parallel, while I am limiting to 3 processes and 20 workers consumed by the first one available. This can be fixed by a `Queue`, though. – WoJ Sep 16 '15 at 10:38
0

could you try defining a function that yields your values slowly?

def get_numbers_on_delay(numbers, delay):
    for i in numbers:
        yield i
        time.sleep(delay)

and then:

results = pool.map(worker, get_numbers_on_delay(numbers, 5))

i haven't tested it, so i'm not sure, but give it a shot.

acushner
  • 9,595
  • 1
  • 34
  • 34
0

I couldn't get the locking answer to work for some reason so i implemented it this way. I realize the question is old, but maybe someone else has the same problem.

It spawns all the processes similar to the locking solution, but sleeps before work based on their process name number.

from multiprocessing import current_process
from re import search
from time import sleep

def worker():
    process_number = search('\d+', current_process().name).group()
    time_between_workers = 5
    sleep(time_between_workers * int(process_number))
    #do your work here

Since the names given to the processes seem to be unique and incremental, this grabs the number of the process and sleeps based on that. SpawnPoolWorker-1 sleeps 1 * 5 seconds, SpawnPoolWorker-2 sleeps 2 * 5 seconds etc.

raecer
  • 195
  • 4
  • 14