4

I have a big python array that I would like to break into chunks and then perform a calculation on the chunks and then "reassembly" into one array. Below is what I have so far I'm just starting to learn threading in general and threading with Python.

def performCalc(binaryArray):

    # perform some operation
    rArray = blah * binaryArray
    return rArray

def main(argv):
    numberOfThreads = 5
    str(len(grey_arr) # 100,000 elements
    greyScaleChunks = np.array_split(grey_arr, numberOfThreads)
    for i in range(numberOfThreads):
        t = Thread(target=performCalc, args=(greyScaleChunks[i],))
        t.start()

    # take all the return values and later create one big array to be resized into matrix.

The ordering of the chunks is important and I have to maintain that.

martineau
  • 119,623
  • 25
  • 170
  • 301
user2743
  • 1,423
  • 3
  • 22
  • 34
  • 1
    Threading is the wrong approach here. The time your program spends doing context switches is time it could spend actually computing; only one thread can run at a time. You want the `multiprocessing` module to spread your computation across multiple *processes*, which can be executed in parallel on multiple processors/cores. – chepner Mar 19 '18 at 16:50
  • @chepner This looks like he’s using numpy arrays. If so, the vast majority of his CPU time will be spent inside numpy, with the GIL unlocked, so threading works fine. – abarnert Mar 19 '18 at 16:59
  • It may be simpler to use the `ThreadPoolExecutor` from `concurrent.futures`. You don't actually _need_ a thread pool here, because you have one thread per task, but it still makes it easy to distribute the jobs and gather the results in order, which is exactly the part you're having trouble with. – abarnert Mar 19 '18 at 17:04
  • @chepner Also, the actual context switching is going to be identical for 5 threads vs. 5 processes. 5 processes fighting over 4 cores will do just as badly as 5 threads, while 5 processes using 8 cores will parallelize just as ideally as 5 threads. – abarnert Mar 19 '18 at 17:06
  • I thought threads were restricted to a single process, so all 5 threads would be restricted to a single core, regardless of how many are available? – chepner Mar 19 '18 at 17:11
  • 3
    @chepner Yes, threads are restricted to a single process, but every modern OS handles multithreading within processes, so threads are not restricted to a single core. In CPython, if you're doing significant CPU work in Python itself, you need to hold the GIL (Global Interpreter Lock) while doing each step, which means at any moment all but one of your threads are waiting to acquire that GIL, so you end up with almost no parallelism (and an added cost from those lock operations)—but if you're doing work inside a C extension that releases the GIL for long periods of time, that's not an issue. – abarnert Mar 19 '18 at 17:21

2 Answers2

2

If you want to solve it with explicit Thread objects, and you want to get the results of the thread functions, you need to hold onto those Thread objects so you can later join them and pull out their results. Like this:

ts = []
for i in range(numberOfThreads):
    t = Thread(target=performCalc, args=(greyScaleChunks[i],))
    ts.append(t)
    t.start()
for t in ts:
    t.join()
# When you get here, all threads have finished

Also, the default implementation of Thread.run just calls your target and throws away the result. So you need to store the return value somewhere the main thread can access. Many numpy programs do this by passing in a pre-allocated array to each thread, so they can fill them in, and that isn't too huge a change to your design, but it's not the way you're headed. You can of course pass in any other mutable object to mutate. Or set a global variable, etc. But you've designed this around returning a value, and that's a nice way to think about things, so let's stick with that. The easiest way to make that work is to subclass Thread:

class ReturningThread(threading.Thread):
    def run(self):
        try:
            if self._target:
                self._result = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs

    def join(self):
        super().join()
        return self._result

This is untested code, but it should work. (I've done similar things in real code, but more complicated, to allow join to handle timeouts properly; here I kept it dead simple, just adding a _result = in the run method and returning it in join.)

So:

ts = []
for i in range(numberOfThreads):
    t = ReturningThread(target=performCalc, args=(greyScaleChunks[i],))
    ts.append(t)
    t.start()
results = []
for t in ts:
    results.append(t.join())

And now you have a list of arrays that you can stack together.


However, what I did above is basically turn each thread into a half-assed future. It may be conceptually simpler to just use actual futures. This does mean that we're now using a thread pool that we don't really have any need for—there's exactly one task per thread. There's a probably-negligible performance cost (you're spending a lot more time on the actual work than the queueing, or you wouldn't want to thread this way in the first place), but, more importantly, we're adding significant extra complexity buried under the hood (in a well-tested stdlib module) for a bit less complexity in our code; whether or not that's worth it is up to you. Anyway:

with concurrent.futures.ThreadPoolExecutor(max_workers=numberOfThreads) as x:
    results = x.map(performCalc, greyScaleChunks)

This handles creating 5 threads, creating a job for each performCalc(chunk), partitioning the 5 jobs out to the 5 threads, joining the threads, and gathering the 5 jobs' results in order, so all you have to do is stack up the results.


Another advantage of using an executor is that if it turns out your code isn't benefiting from thread-parallelism because of the GIL (unlikely to be a problem in your case—you should be spending most of your time in a numpy operation over 20000 rows, which will run with the GIL released—but obviously you have to test to verify that's true), you can switch to processes very easily: just change that ThreadPoolExecutor to a ProcessPoolExecutor and you're done.

It's possible that your args and returns can't be either copied or shared between processes the default way, or that doing so is so expensive that it kills all the benefits of parallelism—but the fact that you can test that with a one-word change, and then only deal with it if it's a problem, is still a win.

abarnert
  • 354,177
  • 51
  • 601
  • 671
0

You can do it by using the largely undocumented ThreadPool (mentioned in this answer) and its map_async() method as shown in the following runnable example:

import numpy as np
from pprint import pprint
from multiprocessing.pool import ThreadPool
import threading

blah = 2

def performCalc(binaryArray):
    # perform some operation
    rArray = blah * binaryArray
    return rArray

def main(data_array):
    numberOfThreads = 5
    pool = ThreadPool(processes=numberOfThreads)

    greyScaleChunks = np.array_split(data_array, numberOfThreads)
    results = pool.map_async(performCalc, greyScaleChunks)
    pool.close()
    pool.join()  # Block until all threads exit.

    # Final results will be a list of arrays.
    pprint(results.get())

grey_arr = np.array(range(50))
main(grey_arr)

Results printed:

[array([ 0,  2,  4,  6,  8, 10, 12, 14, 16, 18]),
 array([20, 22, 24, 26, 28, 30, 32, 34, 36, 38]),
 array([40, 42, 44, 46, 48, 50, 52, 54, 56, 58]),
 array([60, 62, 64, 66, 68, 70, 72, 74, 76, 78]),
 array([80, 82, 84, 86, 88, 90, 92, 94, 96, 98])]
martineau
  • 119,623
  • 25
  • 170
  • 301
  • If the expense (more conceptual than performance) of a thread pool isn't an issue here, I think `ThreadPoolExecutor` is clearer than `ThreadPool` (or `dummy.Pool`). And not just because it's documented, while the others are not (and may even go away in a future version). If you need the ability to rapidly change between `[i]map[_async]` variants while experimenting during development, or need to dive under the hood, I'd go with `multiprocessing`, but if not, it's hard to beat `with …Executor as x: r=x.map(…)`. – abarnert Mar 19 '18 at 18:10
  • @abarnert: I haven't found the lack of documentation to be a big problem since a `ThreadPool` operates mostly like a `multiprocessing.pool.Pool` does—and there's lots of information and examples available about using them which applies to this subclass. I suppose it _could_ go away at some point in the future, even though it's been around since Python 2.x. – martineau Mar 19 '18 at 18:28
  • It's been proposed for removal twice at least twice that I know of since 3.0. Although the motivation tends to be that at least `dummy` is "sort of" documented, which would give you something to switch to (although in one discussion someone pointed out that the way it's written, a Python implementation could use a sequential implementation instead of threads even if it had threads). Also, bugs in `ThreadPool` and `dummy.Pool` tend to get less attention than bugs in documented stuff—e.g., a memory leak that was known since 3.3 or so and only fixed in 3.6 (and not backported to 2.7). – abarnert Mar 19 '18 at 18:35