0

I have workers and tasks to do:

workers = ['peter', 'paul', 'mary']
tasks = range(13)

Now I want to split the tasks into chunks or batches of work, so that each worker can work on one batch and does about the same amount of work as everybody else. In my real life I want to schedule batch jobs to a compute farm. The batch jobs are supposed to run in parallel. The actual schedule&dispatch is done by a commercial grade tool such as lsf or grid.

Some examples of what I would expect:

>>> distribute_work(['peter', 'paul', 'mary'], range(3))
[('peter', [0]), ('paul', [1]), ('mary', [2])]
>>> distribute_work(['peter', 'paul', 'mary'], range(6))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2, 5])]
>>> distribute_work(['peter', 'paul', 'mary'], range(5))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2])]

This question is very similar to the questions here, here, and here

The difference is that I want these features, in the order or precedence:

  1. No use of len, if possible no build-up of long data structures internally
  2. Accept a generator
  3. Return generators
  4. As much use of stdlib components as possible

Some side notes on requirements:

  • No dicts on purpose: I've workers with the same name that can do multiple batches (unix hostnames). If your solution uses dicts, that's fine because we can always do worker lookup by a batch enumeration.
  • Arbitrary length: Both workers and tasks can be iterables of any length >= 1. And they do not have to split evenly as shown in the example above where Mary only gets one task.
  • Order: To me is not important. I guess others may prefer some order like [0,1], [2,3], [5], but I don't care. If your solution can keep or switch the order, maybe that's worthwhile pointing out to others.

I have tried to wrap my head around itertools and this particular problem and came up with the following code to illustrate the question:

from itertools import *

def distribute_work(workers, tasks):
    batches = range(len(workers))
    return [ ( workers[k],
               [t[1] for t in i]
               )   for (k,i) in groupby(sorted(zip(cycle(batches),
                                                   tasks),
                                               key=lambda t: t[0]),
                                        lambda t: t[0]) ]

This satisfies 4., but the sort very likely violates 1.. And 2./3. are not even thought about.

Probably there's some easy solution to this, combining some stdlib components in a way I haven't thought of. But maybe not. Any takers?

Community
  • 1
  • 1
cfi
  • 10,915
  • 8
  • 57
  • 103

5 Answers5

2

I think you want to use multiprocessing.Pool.imap to handle your workers and allocating their jobs. I believe it does everything you want.

jobs = (some generator)                   # can consume jobs from a generator
pool = multiprocessing.Pool(3)            # set number of workers here
results = pool.imap(process_job, jobs)    # returns a generator

for r in results:                         # loop will block until results arrive
    do_something(r)

If the order of the results doesn't matter for your application, you can also use imap_unordered.

Blckknght
  • 100,903
  • 11
  • 120
  • 169
  • Hm. If I do the job dispatch in Python this will do, I guess. It's not really answering the question, but it may very well solve my overall problem. I was not yet planning to code the actual job dispatch in the same python script and probably do not want to do this. The overall system is more complex, and the actual job dispatch currently happens in shell scripts where all the config is already available. I was just trying to hack a ten liner in Python to deal with the ordering/allocation sub problem. Possibly this still works if I let `def process_job` just create/print the shell commands. – cfi Oct 31 '12 at 09:17
  • 1
    @cfi: Hmm, I'm not sure I understand. If you don't want to consume the `jobs` generator completely right from the start (as you do in your code in the question), you will need to let Python have some control of the synchronization between the workers and the supply of jobs (something like `multiprocessing.Queue` at a minimum). If you're going that route though, I'd let the `multiprocessing` module handle as much of it as possible, rather than reinventing its `Pool` class yourself. But perhaps if you explain more about your system architecture we can come up with something else? – Blckknght Oct 31 '12 at 09:43
1

Do you have to pre-batch?

Why not just have a queue, and have each worker pop off the queue when it finishes a work unit?

Tyler Eaves
  • 12,879
  • 1
  • 32
  • 39
  • Good point. Got to clarify that this is about scheduling jobs for machines that ought to run in parallel. Thew whole point is paralelizing workload to reduce the latency from start to result. – cfi Oct 31 '12 at 06:46
1

Following Tyler's answer:

def doleOut(queue, workers):
    for worker,task in itertools.izip(itertools.cycle(workers),queue):
        yield worker,task

This will keep returning (worker, task) tuples as long as there's a queue. So if you have a blocking waitForMoreWork you can do this:

queue = []
doler = distribute_work(workers, queue)
while 1:
    queue.append(waitForMoreWork)
    currentqueuelen = len(queue)
    for i in range(0,queuelen):
        worker,item = doler.next()
        worker.passitem(item)

That way it will block until there are more queue items, then distribute those, then block again. You can set up your waitForMoreWork expression to hand out as many items at a time as seems sensible.

Community
  • 1
  • 1
Phil H
  • 19,928
  • 7
  • 68
  • 105
  • For Python 3 please replace `itertools.izip` with `zip`. – cfi Oct 31 '12 at 09:29
  • Unfortunately this is half the solution I have - albeit much simpler & nicer. This produces just one generator, I would need one per worker. – cfi Oct 31 '12 at 09:30
  • 1
    @cfi: Unless you have a single method distributing the load, how will you distribute them correctly? The only way would be to enumerate them and use modulo or somesuch value, but somewhere you'll have to supply the enumeration... – Phil H Oct 31 '12 at 09:52
  • Correct. Sticking to generators we'd have to first create something like the output of your `doleOut`, then `itertools.tee` it for every worker, and then use a generator that returns only every `n`th step. This would seem like overkill to me. – cfi Oct 31 '12 at 10:00
0

Ok, after saying its impossible, here's an idea. Maybe this is something I should move to codereview - I'd very much be interested in comments on how much overhead this incurs in memory. In other words I don't know if this really solves the problem where the task list is very long and of unknown size. As Blckknght mentioned multiprocessing might be the better alternative.

The code:

import itertools

def distribute_work(workers, tasks):
    """Return one generator per worker with a fair share of tasks

    Task may be an arbitrary length generator.
    Workers should be an iterable.
    """
    worker_count = len(workers)
    worker_ids = range(worker_count)
    all_tasks_for_all_workers = itertools.tee(tasks, worker_count)
    assignments = [ (workers[id], itertools.islice(i, id, None, worker_count))
                    for (id,i) in enumerate(all_tasks_for_all_workers) ]    
    return(assignments)

The algorithm is to

  1. Duplicate the original task list once for each worker. Since this is only duplicating generator objects it should be agnostic to the size of the task list in memory. Even if this is a relative expensive operation, it's a one time startup cost only and insignificant in memory for very large tasklists.
  2. To assign tasks to one worker each worker has to grab a slice of the tasklist. If #W is the amount of workers, the first worker takes tasks 0, #W, 2*#W, 3*#W, etc. The second workers takes 0+1, #W+1, 2*#W+1, 3*#W+1, etc. The splicing for each worker can be done with itertools.islice

For a pure splitting up/assignment of tasks, the names of the workers is not really required for this function. But the amount of workers is. Changing this would make the function more versatile and useful and makes the return value easier to understand. To answer my own question, I'll leave the function as is.

Usage and result:

>>> for (worker,tasks) in distribute_work(['peter', 'paul', 'mary'], range(5)):
...   print(worker, list(tasks))
... 
peter [0, 3]
paul [1, 4]
mary [2]

And it also handles the cases where workers have the same names but are different entities:

>>> for (worker,tasks) in distribute_work(['p', 'p', 'mary'], range(5)): 
...   print(worker, list(tasks))
... 
p [0, 3]
p [1, 4]
mary [2]
Community
  • 1
  • 1
cfi
  • 10,915
  • 8
  • 57
  • 103
0

Here's an approach I like:

parallelism = os.cpu_count()
num_todos = len(todos)

# this zip fanciness makes each chunk stripe through the data sequentially overall so that the
# first items still get done first across all the workers
chunksize = math.ceil(num_todos / parallelism)
chunks = list(itertools.zip_longest(*[todos[i:i+chunksize] for i in range(0, num_todos, chunksize)]))
chunks = [[c for c in chunk if c is not None] for chunk in chunks]

with Pool(processes=parallelism) as pool:
    tasks = [pool.apply_async(my_function, args=(chunk)) for chunk in chunks]
    [task.get() for task in tasks]

Depending on whether you need to accumulate the result, you could adjust, but the interesting part to me is having the workers collaborate to get things done in global order (in my case, processing sequential frames of images so I can see how things look as all the cpus are cranking).

Karl Rosaen
  • 4,508
  • 2
  • 27
  • 30