33

I am trying to use a worker Pool in python using Process objects. Each worker (a Process) does some initialization (takes a non-trivial amount of time), gets passed a series of jobs (ideally using map()), and returns something. No communication is necessary beyond that. However, I can't seem to figure out how to use map() to use my worker's compute() function.

from multiprocessing import Pool, Process

class Worker(Process):
    def __init__(self):
        print 'Worker started'
        # do some initialization here
        super(Worker, self).__init__()

    def compute(self, data):
        print 'Computing things!'
        return data * data

if __name__ == '__main__':
    # This works fine
    worker = Worker()
    print worker.compute(3)

    # workers get initialized fine
    pool = Pool(processes = 4,
                initializer = Worker)
    data = range(10)
    # How to use my worker pool?
    result = pool.map(compute, data)

Is a job queue the way to go instead, or can I use map()?

Felix
  • 2,064
  • 3
  • 21
  • 31
  • All process objects are stateful. You might want to remove that word from the title. Also. `compute` is a method of a Worker. In the examples it's usually a completely stand-alone function. Why not write the compute function to simply include both initialization and processing? – S.Lott Jan 27 '12 at 19:48
  • 1
    Fair enough, thanks. The initialization takes a long time, so I only want to do it once per worker process. – Felix Jan 27 '12 at 20:14
  • You must want to emphasize the "gets passed a series of jobs" part of the question. Since that wasn't obvious. – S.Lott Jan 27 '12 at 20:19

3 Answers3

65

I would suggest that you use a Queue for this.

class Worker(Process):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue

    def run(self):
        print('Worker started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.queue.get, None):
            # Use data

Now you can start a pile of these, all getting work from a single queue

request_queue = Queue()
for i in range(4):
    Worker(request_queue).start()
for data in the_real_source:
    request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
    request_queue.put(None) 

That kind of thing should allow you to amortize the expensive startup cost across multiple workers.

foxyblue
  • 2,859
  • 2
  • 21
  • 29
S.Lott
  • 384,516
  • 81
  • 508
  • 779
  • 1
    That's what I figured, thanks! I ended up using a job queue (input) and result queue (output) to synchronize everything. – Felix Jan 30 '12 at 18:44
  • you example is awesome, i try right now how to input the sentinel objects when strg + c is pressed without an exepction – Dukeatcoding Jun 26 '13 at 09:55
  • @S.Lott: Isn't it that Queue isn't pickle-able? and that's why you use [multiprocessing.Manager().Queue](http://stackoverflow.com/questions/3217002/how-do-you-pass-a-queue-reference-to-a-function-managed-by-pool-map-async)? – zuuz Dec 16 '13 at 12:51
  • It is really *much more customizable* than the default `multiprocessing.Pool()`!!! – Daniel Jun 28 '16 at 16:00
  • This way will create 4 separate processes. If you want a pool of workers to run in a single process, use Pool https://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers – bmoran May 02 '17 at 19:04
7

initializer expects an arbitrary callable that does initilization e.g., it can set some globals, not a Process subclass; map accepts an arbitrary iterable:

#!/usr/bin/env python
import multiprocessing as mp

def init(val):
    print('do some initialization here')

def compute(data):
    print('Computing things!')
    return data * data

def produce_data():
    yield -100
    for i in range(10):
        yield i
    yield 100

if __name__=="__main__":
  p = mp.Pool(initializer=init, initargs=('arg',))
  print(p.map(compute, produce_data()))
jfs
  • 399,953
  • 195
  • 994
  • 1,670
2

Since python 3.3 you can use starmap, also for using multiple arguments AND getting back the results in a very simplistic syntax:

import multiprocessing

nb_cores = multiprocessing.cpu_count()

def caps(nb, letter):
    print('Exec nb:', nb)
    return letter.upper()

if __name__ == '__main__':

    multiprocessing.freeze_support() # for Windows, also requires to be in the statement: if __name__ == '__main__'

    input_data = ['a','b','c','d','e','f','g','h']
    input_order = [1,2,3,4,5,6,7,8,9]

    with multiprocessing.Pool(processes=nb_cores) as pool: # auto closing workers
        results = pool.starmap(caps, zip(input_order, input_data))

    print(results)