3

The scenario:

I have a really large DB model migration going on for a new build, and I'm working on boilerplating how we will go about migration current live data from a webapp into the local test databases.

I'd like to setup in python a script that will concurrently process the migration of my models. I have from_legacy and to_legacy methods for my model instances. What I have so far loads all my instances and creates threads for each, with each thread subclassed from the core threading modules with a run method that just does the conversion and saves the result.

I'd like to make the main loop in the program build a big stack of instances of these threads, and start to process them one by one, running only at most 10 concurrently as it does its work, and feeding the next in to be processed as others finish migrating.

What I can't figure out is how to utilize the queue correctly to do this? If each thread represents the full task of migration, should I load all the instances first and then create a Queue with maxsize set to 10, and have that only track currently running queues? Something like this perhaps?

currently_running = Queue()
for model in models:
  task = Migrate(models) #this is subclassed thread
  currently_running.put(task)
  task.start()

In this case relying on the put call to block while it is at capacity? If I were to go this route, how would I call task_done?

Or rather, should the Queue include all the tasks (not just the started ones) and use join to block to completion? Does calling join on a queue of threads start the included threads?

What is the best methodology to approach the "at most have N running threads" problem and what role should the Queue play?

DeaconDesperado
  • 9,977
  • 9
  • 47
  • 77
  • You don't want to do it that way because while a job is being processed, it won't be in the queue of waiting jobs. You'd need a queue just of "in flight" jobs and that seems kind of silly. Just create the number of threads you want. There is no such thing as "included threads", but so long as each thread doesn't terminate until its job is done (including waiting for any threads that are part of that), then just waiting on the main threads should do it. – David Schwartz Dec 13 '12 at 21:40
  • So create ten threads that themselves are fed the model instances, and not a thread for each? How could I restart them after they each finish migrating? – DeaconDesperado Dec 13 '12 at 21:43
  • Don't restart them. The thread just keeps pulling jobs from the queue until the queue is empty. – David Schwartz Dec 13 '12 at 21:53

2 Answers2

5

Although not documented, the multiprocessing module has a ThreadPool class which, as its name implies, creates a pool of threads. It shares the same API as the multiprocessing.Pool class.

You can then send tasks to the thread pool using pool.apply_async:

import multiprocessing.pool as mpool

def worker(task):
    # work on task
    print(task)     # substitute your migration code here.

# create a pool of 10 threads
pool = mpool.ThreadPool(10)
N = 100

for task in range(N):
    pool.apply_async(worker, args = (task, ))

pool.close()
pool.join()
Community
  • 1
  • 1
unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • This is looking good! Would it be okay to substitute the `len` of my complete list of tasks for `N`? The pool will keep track of what's live? – DeaconDesperado Dec 14 '12 at 15:45
  • Also, the arg `task` should be a process/thread, or just a function representing the procedure? – DeaconDesperado Dec 14 '12 at 15:47
  • @DeaconDesperado: Yes, instead of `N`, you could use the `len` of a list of tasks. The `task` should *NOT* be a thread. Let the `pool` coordinate the threads for you. Rather, let `task` be whatever object or data you need to pass to `worker` to distinguish one migration task from another. Given the code you posted, I think perhaps `task` should equal `model`, but I'm not quite clear about what `from_legacy` or `to_legacy` is doing, so I could be wrong. – unutbu Dec 14 '12 at 18:25
  • Note: "New in version 2.6" – dfrankow Feb 28 '13 at 00:20
0

This should probably be done using semaphores the example in the documentation is a hint of what you're try to accomplish.

Bula
  • 1,590
  • 1
  • 14
  • 33