6

I have a function that accepts a large array of x,y pairs as an input which does some elaborate curve fitting using numpy and scipy and then returns a single value. To try and speed things up I am trying to have two threads that I feed the data to using Queue.Queue . Once the data is done. I am trying to have the threads terminate and then end the calling process and return control to the shell.

I am trying to understand why I have to resort to a private method in threading.Thread to stop my threads and return control to the commandline.

The self.join() does not end the program. The only way to get back control was to use the private stop method.

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            # self.join(timeout=None) does not work
            self._Thread__stop()

Here is an approximation of my code:

    class CalcThread(threading.Thread):
        def __init__(self,in_queue,out_queue,function):
            threading.Thread.__init__(self)
            self.in_queue = in_queue
            self.out_queue = out_queue
            self.function = function
            self.finished = threading.Event()

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            self._Thread__stop()

        def run(self):
            while not self.finished.isSet():
                params_for_function = self.in_queue.get()
                try:
                    tm = self.function(paramsforfunction)
                    self.in_queue.task_done()
                    self.out_queue.put(tm)
                except ValueError as v:
                    #modify params and reinsert into queue
                    window = params_for_function["window"]
                    params_for_function["window"] = window + 1
                    self.in_queue.put(params_for_function)

    def big_calculation(well_id,window,data_arrays):
            # do some analysis to calculate tm
            return tm

    if __name__ == "__main__":
        NUM_THREADS = 2
        workers = []
        in_queue = Queue()
        out_queue = Queue()

        for i in range(NUM_THREADS):
            w = CalcThread(in_queue,out_queue,big_calculation)
            w.start()
            workers.append(w)

        if options.analyze_all:
              for i in well_ids:
                  in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))

        in_queue.join()
        print "ALL THREADS SEEM TO BE DONE"
        # gather data and report it from out_queue
        for i in well_ids:
            p = out_queue.get()
            print p
            out_queue.task_done()
            # I had to do this to get the out_queue to proceed
            if out_queue.qsize() == 0:
                out_queue.join()
                break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method

        for aworker in workers:
            aworker.stop()
harijay
  • 11,303
  • 12
  • 38
  • 52

3 Answers3

5

In general it is a bad idea to kill a thread that modifies shared resource.

CPU intensive tasks in multiple threads are worse than useless in Python unless you release GIL while performing computations. Many numpy functions do release GIL.

ThreadPoolExecutor example from the docs

import concurrent.futures # on Python 2.x: pip install futures 

calc_args = []
if options.analyze_all:
    calc_args.extend(dict(well_id=i,...) for i in well_ids)

with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    future_to_args = dict((executor.submit(big_calculation, args), args)
                           for args in calc_args)

    while future_to_args:
        for future in concurrent.futures.as_completed(dict(**future_to_args)):
            args = future_to_args.pop(future)
            if future.exception() is not None:
                print('%r generated an exception: %s' % (args,
                                                         future.exception()))
                if isinstance(future.exception(), ValueError):
                    #modify params and resubmit
                    args["window"] += 1
                    future_to_args[executor.submit(big_calculation, args)] = args

            else:
                print('f%r returned %r' % (args, future.result()))

print("ALL work SEEMs TO BE DONE")

You could replace ThreadPoolExecutor by ProcessPoolExecutor if there is no shared state. Put the code in your main() function.

jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • WOW this is a HUGE eye-opener. Thanks a lot for introducing me to concurrent.futures. And it works very well with python 2.7 and numpy and scipy. None of the hastles of thread.Threading and all the concurrent execution benefits – harijay Oct 09 '11 at 12:20
4

To elaborate on my comment - if the sole purpose of your threads is to consume values from a Queue and perform a function on them you're decidedly better off to do something like this IMHO:

q = Queue()
results = []

def worker():
  while True:
    x, y = q.get()
    results.append(x ** y)
    q.task_done()

for _ in range(workerCount):
  t = Thread(target = worker)
  t.daemon = True
  t.start()

for tup in listOfXYs:
  q.put(tup)

q.join()

# Some more code here with the results list.

q.join() will block until it is empty again. The worker threads will continue to attempt to retrieve values, but won't find any, so they'll wait indefinitely once the queue is empty. When your script finishes its execution later the worker threads will die because they're marked as daemon threads.

g.d.d.c
  • 46,865
  • 9
  • 101
  • 111
  • 2
    Instead of using daemons for this stuff (imo not a nice design for that situation, YMMV), you could use sentinel values. I.e. after all jobs are finished, put `nrThreads` sentinel values into the queue and then join the queue or the threads again. The threads just check if `get()` returned the sentinel (None is a good choice usually) and stop in that case. Makes it also easier to include the code in a larger design. – Voo Oct 06 '11 at 22:03
  • @Voo: The worker threads themselves are putting new values in `in_queue`. If the main thread puts sentinels in `in_queue`, they may signal termination prematurely. How would you handle this situation? – unutbu Oct 06 '11 at 22:12
  • @unutbu - I don't personally see the advantage to the sentinel values, but you could (theoretically) address that concern by using a LifoQueue in place of a standard queue and pre-populate it with a sentinel value for each worker thread. This does carry the potential (in the op's case at least) that some of your workers die off early, but that a final worker that re-adds to the in_queue several times ends up running significantly longer. A daemon thread in a blocked `queue.get()` consumes little to no resources and isn't a performance drain in my experience. – g.d.d.c Oct 06 '11 at 22:18
  • @g.d.d.c: I like your idea of using a LifoQueue. I think that could be workable. But there is still one other the problem: how to know when `out_queue` is empty. I don't think testing `qsize` is safe -- a thread may be about to `put` a new item in `out_queue` while the main thread is testing `qsize` when it is temporarily zero. – unutbu Oct 06 '11 at 22:27
  • @unutbu - Additionally, you could call `q.join()`, wait for all values to process, then `q.put(sentinel)` for the number of workers, then `q.join()` again. I don't seem to recall there being any restriction on joining a queue multiple times, especially if you've added values again. – g.d.d.c Oct 06 '11 at 22:29
  • @g.d.d.c: Ah, yes, and after the sentinel is sent to the workers, they can put a sentinel in `out_queue` just before they terminate. Then the main thread can know it has consumed `out_queue` after it gets `NUM_THREADS` sentinels. Great :) – unutbu Oct 06 '11 at 22:44
  • 1
    I obviously didn't explain the concept good enough, but yep, g.d.d.c got it right. wait, put sentinels in queue, wait again (though you could wait on the threads/processes/threadpool/whatever the second time as well; not exactly the same semantics but close enough). It's quite a useful pattern for this kind of problem - can get more complicated with several input queues, output queues, etc. but that's the nature of the beast and we can generalize the solution to that as well. – Voo Oct 06 '11 at 22:56
  • Also the "problem" with daemon threads in my opinion (obviously completely subjective) is, that you basically have to know that the workers are always daemons to make sense of it - that can be confusing in my experience, but some comment describing that approach would clear that up as well. Having multiple processes waiting, takes up resources that can be better spent - python processes are somewhat lightweight but still use quite some memory. – Voo Oct 06 '11 at 22:59
  • Thanks for all the discusion. I can use g.d.d.c's method of Thread execution provided I put a time.sleep() into the for loop that spawns the worker threads. Without that the interpreter returns no answer and raises no error – harijay Oct 07 '11 at 01:16
  • This works: `code` for aworker in range(5): t = Thread(target = worker) t.daemon = True t.start() time.sleep(1) print "Started" `code` – harijay Oct 07 '11 at 01:20
  • @harijay - depending on how you restructure your code you may be able to avoid the need for the `sleep()` call by moving the `in_queue.put()` commands up above the thread generation. I tend to try to avoid using arbitrary sleep operations when I can avoid it, as they tend to indicate the potential for a race condition later, but otherwise that all sounds good. I'm glad to have helped. :) – g.d.d.c Oct 07 '11 at 02:29
  • @g.d.d.c I have tried various ways of implementing this i.e in_queue.put() before and after the thread, with and without sleep statements. No matter what I do (even with the sleep() statements) I cannot seem to have multiple threads churn through the 384 long list of 2400 x,y pairs. In every case it stops prematurely and at different points and throws no Error. I can see how this is tricky . But I think numpy and scipy may have something to do with the trickyness. – harijay Oct 07 '11 at 10:21
  • @harijay - that's very odd indeed. I've not spent enough time with numpy and scipy to know how or why they might be influencing the workflow. I've never encountered situations where it makes it part-way though, but does not complete. – g.d.d.c Oct 07 '11 at 16:11
0

I tried g.d.d.c's method and it produced an interesting result. I could get his exact x**y calculation to work just fine spread between the threads .

When I called my function inside the worker while True loop. I could perform the calculations among multiple threads only if I put a time.sleep(1) in the for loop that calls the threads start() method.

So In my code. Without the time.sleep(1) the program gave me either a clean exit with no output or in some cases

"Exception in thread Thread-2 (most likely raised during interpreter shutdown):Exception in thread Thread-1 (most likely raised during interpreter shutdown):"

Once I added the time.sleep() everything ran fine.

for aworker in range(5):
    t = Thread(target = worker)
    t.daemon = True
    t.start()
    # This sleep was essential or results for my specific function were None
    time.sleep(1)
    print "Started"
harijay
  • 11,303
  • 12
  • 38
  • 52