0

I have a producer thread that produces data from a serial connection and puts them into multiple queues that will be used by different consumer threads. However, I'd like to be able to add in additional queues (additional consumers) from the main thread after the producer thread has already started running.

I.e. In the code below, how could I add a Queue to listOfQueues from the main thread while this thread is running? Can I add in a method such as addQueue(newQueue) to this class which appends to it listOfQueues? This doesn't seem likely as the thread will be in the run method. Can I create some sort of Event similar to the stop event?

class ProducerThread(threading.Thread):
    def __init__(self, listOfQueues):
        super(ProducerThread, self).__init__()
        self.listOfQueues = listOfQueues
        self._stop_event = threading.Event() # Flag to be set when the thread should stop

    def run(self):
        ser = serial.Serial() # Some serial connection 

        while(not self.stopped()):
            try:
                bytestring = ser.readline() # Serial connection or "producer" at some rate
                for q in self.listOfQueues:
                    q.put(bytestring)
            except serial.SerialException:
                continue

    def stop(self):
        '''
        Call this function to stop the thread. Must also use .join() in the main
        thread to fully ensure the thread has completed.
        :return: 
        '''
        self._stop_event.set()

    def stopped(self):
        '''
        Call this function to determine if the thread has stopped. 
        :return: boolean True or False
        '''
        return self._stop_event.is_set()

1 Answers1

0

Sure, you can simply have an append function that adds to your list. E.g.

def append(self, element):
    self.listOfQueues.append(element)

That will work even after your thread's start() method has been called.

Edit: for non thread-safe procedures you can use a lock, e.g.:

def unsafe(self, element):
    with self.lock:
        # do stuff

You would then also need to add the lock inside your run method, e.g.:

with lock:
    for q in self.listOfQueues:
        q.put(bytestring)

Any code acquiring a lock will wait for the lock to be released elsewhere.

101
  • 8,514
  • 6
  • 43
  • 69
  • thank you. Since there is the GIL and by default the thread switches every 100 instructions, is there a case where this thread stops in the middle of the for loop and in the main thread a queue is added to self.listOfQueues and it causes a problem because the producer thread is in the middle of loop? – Thomas Jacobs Oct 22 '17 at 20:16
  • No, as `append` is thread safe: https://stackoverflow.com/a/18568017/1470749. If you wish to do more complex procedures that aren't thread safe, then you can do so by using a lock (see edit). – 101 Oct 22 '17 at 20:22
  • @ThomasJacobs Then add the new ` Queue` to a separate list `new_queues`. Append `new_queues` to `listOfQueues` after the `for`-loop in the `run` method. – Roland Smith Oct 22 '17 at 20:22