9

I'm doing a project involving data collection and logging. I have 2 threads running, a collection thread and a logging thread, both started in main. I'm trying to allow the program to be terminated gracefully when with Ctrl-C.

I'm using a threading.Event to signal to the threads to end their respective loops. It works fine to stop the sim_collectData method, but it doesn't seem to be properly stopping the logData thread. The Collection terminated print statement is never executed, and the program just stalls. (It doesn't end, just sits there).

The second while loop in logData is to make sure everything in the queue is logged. The goal is for Ctrl-C to stop the collection thread immediately, then allow the logging thread to finish emptying the queue, and only then fully terminate the program. (Right now, the data is just being printed out - eventually it's going to be logged to a database).

I don't understand why the second thread never terminates. I'm basing what I've done on this answer: Stopping a thread after a certain amount of time. What am I missing?

def sim_collectData(input_queue, stop_event):
    ''' this provides some output simulating the serial
    data from the data logging hardware. 
    '''
    n = 0
    while not stop_event.is_set():
        input_queue.put("DATA: <here are some random data> " + str(n))
        stop_event.wait(random.randint(0,5))
        n += 1
    print "Terminating data collection..."
    return

def logData(input_queue, stop_event):
    n = 0

    # we *don't* want to loop based on queue size because the queue could
    # theoretically be empty while waiting on some data.
    while not stop_event.is_set():
        d = input_queue.get()
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1

    # if the stop event is recieved and the previous loop terminates, 
    # finish logging the rest of the items in the queue.
    print "Collection terminated. Logging remaining data to database..."
    while not input_queue.empty():
        d = input_queue.get()
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1
    return


def main():
    input_queue = Queue.Queue()

    stop_event = threading.Event() # used to signal termination to the threads

    print "Starting data collection thread...",
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,     stop_event))
    collection_thread.start()
    print "Done."

    print "Starting logging thread...",
    logging_thread = threading.Thread(target=logData, args=(input_queue, stop_event))
    logging_thread.start()
    print "Done."

    try:
        while True:
        time.sleep(10)
    except (KeyboardInterrupt, SystemExit):
        # stop data collection. Let the logging thread finish logging everything in the queue
        stop_event.set()

 main()
Community
  • 1
  • 1
Isaac Dontje Lindell
  • 3,246
  • 6
  • 24
  • 35

4 Answers4

10

The problem is that your logger is waiting on d = input_queue.get() and will not check the event. One solution is to skip the event completely and invent a unique message that tells the logger to stop. When you get a signal, send that message to the queue.

import threading
import Queue
import random
import time

def sim_collectData(input_queue, stop_event):
    ''' this provides some output simulating the serial
    data from the data logging hardware. 
    '''
    n = 0
    while not stop_event.is_set():
        input_queue.put("DATA: <here are some random data> " + str(n))
        stop_event.wait(random.randint(0,5))
        n += 1
    print "Terminating data collection..."
    input_queue.put(None)
    return

def logData(input_queue):
    n = 0

    # we *don't* want to loop based on queue size because the queue could
    # theoretically be empty while waiting on some data.
    while True:
        d = input_queue.get()
        if d is None:
            input_queue.task_done()
            return
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1

def main():
    input_queue = Queue.Queue()

    stop_event = threading.Event() # used to signal termination to the threads

    print "Starting data collection thread...",
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,     stop_event))
    collection_thread.start()
    print "Done."

    print "Starting logging thread...",
    logging_thread = threading.Thread(target=logData, args=(input_queue,))
    logging_thread.start()
    print "Done."

    try:
        while True:
            time.sleep(10)
    except (KeyboardInterrupt, SystemExit):
        # stop data collection. Let the logging thread finish logging everything in the queue
        stop_event.set()

main()
tdelaney
  • 73,364
  • 6
  • 83
  • 116
  • Notice that in the end you have to put as many `None` into the queue as there are blocked threads. – bodo Sep 10 '13 at 09:08
  • @canaaerus - that is a very good point. In this case there is only 1 worker thread but calling out the fact that N worker threads need N termination messages is a nice addition. – tdelaney Sep 10 '13 at 15:32
2

I'm not an expert in threading, but in your logData function the first d=input_queue.get() is blocking, i.e., if the queue is empty it will sit an wait forever until a queue message is received. This is likely why the logData thread never terminates, it's sitting waiting forever for a queue message.

Refer to the [Python docs] to change this to a non-blocking queue read: use .get(False) or .get_nowait() - but either will require some exception handling for cases when the queue is empty.

Raceyman
  • 1,354
  • 1
  • 9
  • 12
1

You are calling a blocking get on your input_queue with no timeout. In either section of logData, if you call input_queue.get() and the queue is empty, it will block indefinitely, preventing the logging_thread from reaching completion.

To fix, you will want to call input_queue.get_nowait() or pass a timeout to input_queue.get().

Here is my suggestion:

def logData(input_queue, stop_event):
    n = 0

    while not stop_event.is_set():
        try:
            d = input_queue.get_nowait()
            if d.startswith("DATA:"):
                print "LOG: " + d
                n += 1
        except Queue.Empty:
            time.sleep(1)
    return

You are also signalling the threads to terminate, but not waiting for them to do so. Consider doing this in your main function.

try:
    while True:
        time.sleep(10)
except (KeyboardInterrupt, SystemExit):
    stop_event.set()
    collection_thread.join()
    logging_thread.join()
rrhartjr
  • 2,112
  • 2
  • 15
  • 21
0

Based on the answer of tdelaney I created an iterator based approach. The iterator exits when the termination message is encountered. I also added a counter of how many get-calls are currently blocking and a stop-method, which sends just as many termination messages. To prevent a race condition between incrementing and reading the counter, I'm setting a stopping bit there. Furthermore I don't use None as the termination message, because it can not necessarily be compared to other data types when using a PriorityQueue.

There are two restrictions, that I had no need to eliminate. For one the stop-method first waits until the queue is empty before shutting down the threads. The second restriction is, that I did not any code to make the queue reusable after stop. The latter can probably be added quite easily, while the former requires being careful about concurrency and the context in which the code is used.

You have to decide whether you want stop to also wait for all the termination messages to be consumed. I choose to put the necessary join there, but you may just remove it.

So this is the code:

import threading, queue

from functools import total_ordering
@total_ordering
class Final:
    def __repr__(self):
        return "∞"

    def __lt__(self, other):
        return False

    def __eq__(self, other):
        return isinstance(other, Final)

Infty = Final()

class IterQueue(queue.Queue):
    def __init__(self):
        self.lock = threading.Lock()
        self.stopped = False
        self.getters = 0
        super().__init__()

    def __iter__(self):
        return self

    def get(self):
        raise NotImplementedError("This queue may only be used as an iterator.")

    def __next__(self):
        with self.lock:
            if self.stopped:
                raise StopIteration
            self.getters += 1
        data = super().get()
        if data == Infty:
            self.task_done()
            raise StopIteration
        with self.lock:
            self.getters -= 1
        return data

    def stop(self):
        self.join()
        self.stopped = True
        with self.lock:
            for i in range(self.getters):
                self.put(Infty)
        self.join()

class IterPriorityQueue(IterQueue, queue.PriorityQueue):
    pass

Oh, and I wrote this in python 3.2. So after backporting,

import threading, Queue

from functools import total_ordering
@total_ordering
class Final:
    def __repr__(self):
        return "Infinity"

    def __lt__(self, other):
        return False

    def __eq__(self, other):
        return isinstance(other, Final)

Infty = Final()

class IterQueue(Queue.Queue, object):
    def __init__(self):
        self.lock = threading.Lock()
        self.stopped = False
        self.getters = 0
        super(IterQueue, self).__init__()

    def __iter__(self):
        return self

    def get(self):
        raise NotImplementedError("This queue may only be used as an iterator.")

    def next(self):
        with self.lock:
            if self.stopped:
                raise StopIteration
            self.getters += 1
        data = super(IterQueue, self).get()
        if data == Infty:
            self.task_done()
            raise StopIteration
        with self.lock:
            self.getters -= 1
        return data

    def stop(self):
        self.join()
        self.stopped = True
        with self.lock:
            for i in range(self.getters):
                self.put(Infty)
        self.join()

class IterPriorityQueue(IterQueue, Queue.PriorityQueue):
    pass

you would use it as

import random
import time

def sim_collectData(input_queue, stop_event):
    ''' this provides some output simulating the serial
    data from the data logging hardware. 
    '''
    n = 0
    while not stop_event.is_set():
        input_queue.put("DATA: <here are some random data> " + str(n))
        stop_event.wait(random.randint(0,5))
        n += 1
    print "Terminating data collection..."
    return

def logData(input_queue):
    n = 0

    # we *don't* want to loop based on queue size because the queue could
    # theoretically be empty while waiting on some data.
    for d in input_queue:
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1

def main():
    input_queue = IterQueue()

    stop_event = threading.Event() # used to signal termination to the threads

    print "Starting data collection thread...",
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,     stop_event))
    collection_thread.start()
    print "Done."

    print "Starting logging thread...",
    logging_thread = threading.Thread(target=logData, args=(input_queue,))
    logging_thread.start()
    print "Done."

    try:
        while True:
            time.sleep(10)
    except (KeyboardInterrupt, SystemExit):
        # stop data collection. Let the logging thread finish logging everything in the queue
        stop_event.set()
        input_queue.stop()

main()
bodo
  • 1,005
  • 15
  • 31