If you want to solve it with explicit Thread
objects, and you want to get the results of the thread functions, you need to hold onto those Thread
objects so you can later join
them and pull out their results. Like this:
ts = []
for i in range(numberOfThreads):
t = Thread(target=performCalc, args=(greyScaleChunks[i],))
ts.append(t)
t.start()
for t in ts:
t.join()
# When you get here, all threads have finished
Also, the default implementation of Thread.run
just calls your target
and throws away the result. So you need to store the return value somewhere the main thread can access. Many numpy programs do this by passing in a pre-allocated array to each thread, so they can fill them in, and that isn't too huge a change to your design, but it's not the way you're headed. You can of course pass in any other mutable object to mutate. Or set a global variable, etc. But you've designed this around returning a value, and that's a nice way to think about things, so let's stick with that. The easiest way to make that work is to subclass Thread
:
class ReturningThread(threading.Thread):
def run(self):
try:
if self._target:
self._result = self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs
def join(self):
super().join()
return self._result
This is untested code, but it should work. (I've done similar things in real code, but more complicated, to allow join
to handle timeouts properly; here I kept it dead simple, just adding a _result =
in the run
method and return
ing it in join
.)
So:
ts = []
for i in range(numberOfThreads):
t = ReturningThread(target=performCalc, args=(greyScaleChunks[i],))
ts.append(t)
t.start()
results = []
for t in ts:
results.append(t.join())
And now you have a list of arrays that you can stack together.
However, what I did above is basically turn each thread into a half-assed future. It may be conceptually simpler to just use actual futures. This does mean that we're now using a thread pool that we don't really have any need for—there's exactly one task per thread. There's a probably-negligible performance cost (you're spending a lot more time on the actual work than the queueing, or you wouldn't want to thread this way in the first place), but, more importantly, we're adding significant extra complexity buried under the hood (in a well-tested stdlib module) for a bit less complexity in our code; whether or not that's worth it is up to you. Anyway:
with concurrent.futures.ThreadPoolExecutor(max_workers=numberOfThreads) as x:
results = x.map(performCalc, greyScaleChunks)
This handles creating 5 threads, creating a job for each performCalc(chunk)
, partitioning the 5 jobs out to the 5 threads, joining the threads, and gathering the 5 jobs' results in order, so all you have to do is stack up the results.
Another advantage of using an executor is that if it turns out your code isn't benefiting from thread-parallelism because of the GIL (unlikely to be a problem in your case—you should be spending most of your time in a numpy operation over 20000 rows, which will run with the GIL released—but obviously you have to test to verify that's true), you can switch to processes very easily: just change that ThreadPoolExecutor
to a ProcessPoolExecutor
and you're done.
It's possible that your args and returns can't be either copied or shared between processes the default way, or that doing so is so expensive that it kills all the benefits of parallelism—but the fact that you can test that with a one-word change, and then only deal with it if it's a problem, is still a win.