2

I have a numerical function in python (based on scipy.optimize.minimize)

def func(x):
   //calculation, returning 0 if done

and an algorithm as follows:

for x in X:
    run func(x) 
    terminate the loop if  one of func(x) returns 0 

Above, X is a large set of doubles, each func(x) is independent from the other.

Question: Which of Python's multi-threading/multi-processing functionality can I use to maximize the performance of this calculation?

For info, I am using a multi-core computer.

zell
  • 9,830
  • 10
  • 62
  • 115
  • If this code section, especially `func`'s function body, is really performance-critical, I'd say I'll try implement `func` as a C extension. After that, I'll try two things: heuristics that predict which `x` in `X` is likely to evaluate to `0`, and scale it up in parallel. Not an answer, but a general plan. – Cong Ma Dec 29 '15 at 01:39
  • Thanks. As mentioned, the code is based on Scipy, which calls Fortran in its core. – zell Dec 29 '15 at 02:04

1 Answers1

1

If you have multiple cores then you will need to use multiprocessing to see the benefit. To get a result from part-way through a large number of candidates, you can break it up into batches. This example code ought to help see what to do.

"""
Draws on https://pymotw.com/2/multiprocessing/communication.html

"""
import multiprocessing


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                self.task_queue.task_done()
                break
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Optimiser(object):

    def __init__(self, x):
        self.x = x

    def __call__(self):
        # scipy optimisation function goes here
        if self.x == 49195:
            return self.x


def chunks(iterator, n):
    """Yield successive n-sized chunks from iterator.
    http://stackoverflow.com/a/312464/1706564

    """
    for i in xrange(0, len(iterator), n):
        yield iterator[i:i+n]


if __name__ == '__main__':
    X = range(1, 50000)
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count()
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]

    for w in consumers:
        w.start()

    chunksize = 100  # this should be sized run in around 1 to 10 seconds
    for chunk in chunks(X, chunksize):
        num_jobs = chunksize
        # Enqueue jobs
        for x in chunk:
            tasks.put(Optimiser(x))

        # Wait for all of the tasks to finish
        tasks.join()

        # Start checking results
        while num_jobs:
            result = results.get()
            num_jobs -= 1
            if result:
                # Add a poison pill to kill each consumer
                for i in xrange(num_consumers):
                    tasks.put(None)
                print 'Result:', result
                break
Jamie Bull
  • 12,889
  • 15
  • 77
  • 116
  • Thanks. Way too heavy though. – zell Dec 29 '15 at 17:04
  • Sorry. The issue is you want to know if the answer has been found part-way through the parallelised calculation. I'd love to see a simpler answer that would work with a `multiprocessing.Pool` but I don't know how that would work. I can say for sure you won't see any benefit with `threading` though. – Jamie Bull Dec 29 '15 at 23:02