22

I have a class that is run in separate threads in my application. I can have multiple threads running at a time and the threads are daemons. After a period of time, some of these threads need to receive and process a message. How do I do this?

A sample of my code looks like this:

import threading
import time 

class MyThread(threading.Thread):
    def __init__(self, args=(), kwargs=None):
        threading.Thread.__init__(self, args=(), kwargs=None)
        self.daemon = True
        self.receive_messages = args[0]

    def run(self):
        print threading.currentThread().getName(), self.receive_messages

    def do_thing_with_message(self, message):
        if self.receive_messages:
            print threading.currentThread().getName(), "Received %s".format(message)

if __name__ == '__main__':
    threads = []
    for t in range(10):
        threads.append( MyThread(args=(t % 2 == 0,)))
        threads[t].start()
        time.sleep(0.1)

    for t in threads:
        t.do_thing_with_message("Print this!")

This outputs:

Thread-1 True
Thread-2 False
Thread-3 True
Thread-4 False
Thread-5 True
Thread-6 False
Thread-7 True
Thread-8 False
Thread-9 True
Thread-10 False
MainThread Received %s
MainThread Received %s
MainThread Received %s
MainThread Received %s
MainThread Received %s

I am expecting, however, those last five lines to not be related to the MainThread, and instead of %s, I'd expect it to me Print this!, like so:

Thread-1 True
Thread-2 False
Thread-3 True
Thread-4 False
Thread-5 True
Thread-6 False
Thread-7 True
Thread-8 False
Thread-9 True
Thread-10 False
Thread-1 Received Print this!
Thread-3 Received Print this!
Thread-5 Received Print this!
Thread-7 Received Print this!
Thread-9 Received Print this!

How can I properly send a message like this to the running threads?

Addendum:

If I have this block after the Print this! block, and utilize @dano's code to solve the problem above, it does not seem to respond to these new messages.

for t in threads:
    t.queue.put("Print this again!")
    time.sleep(0.1)

In this case, I'd expect the end of my output to look like this

Thread-1 Received Print this!
Thread-3 Received Print this!
Thread-5 Received Print this!
Thread-7 Received Print this!
Thread-9 Received Print this!
Thread-1 Received Print this again!
Thread-3 Received Print this again!
Thread-5 Received Print this again!
Thread-7 Received Print this again!
Thread-9 Received Print this again!
Python Novice
  • 1,980
  • 5
  • 26
  • 33
  • This is what [task queues](http://www.celeryproject.org/) are designed for. – Burhan Khalid Sep 18 '14 at 05:12
  • 3
    @BurhanKhalid I would say that using something like Celery would be overkill here. Celery is definitely useful for distributing work to multiple processes or multiple machines in a cluster, but something much simpler will suit the OP's needs here. – dano Sep 18 '14 at 05:23

1 Answers1

29

You can use a Queue.Queue (or queue.Queue in Python 3) for this:

import threading
import time 
from Queue import Queue

print_lock = threading.Lock()

class MyThread(threading.Thread):
    def __init__(self, queue, args=(), kwargs=None):
        threading.Thread.__init__(self, args=(), kwargs=None)
        self.queue = queue
        self.daemon = True
        self.receive_messages = args[0]

    def run(self):
        print threading.currentThread().getName(), self.receive_messages
        val = self.queue.get()
        self.do_thing_with_message(val)

    def do_thing_with_message(self, message):
        if self.receive_messages:
            with print_lock:
                print threading.currentThread().getName(), "Received {}".format(message)

if __name__ == '__main__':
    threads = []
    for t in range(10):
        q = Queue()
        threads.append(MyThread(q, args=(t % 2 == 0,)))
        threads[t].start()
        time.sleep(0.1)

    for t in threads:
        t.queue.put("Print this!")

    for t in threads:
        t.join()

We pass a Queue instance to each thread, and send our message to the Thread using queue.put. We wait for the message to arrive in the run method, which is the part of the Thread object that's actually running in a separate thread of execution. Once we get the message, we call do_thing_with_message, which will run in the same background thread.

I've also added a threading.Lock to the code so the prints to stdout don't get mixed up.

Edit:

If you want to be able to deliver multiple messages to the thread, just use a loop:

def run(self):
    print threading.currentThread().getName(), self.receive_messages
    while True:
        val = self.queue.get()
        if val is None:   # If you send `None`, the thread will exit.
            return
        self.do_thing_with_message(val)
dano
  • 91,354
  • 19
  • 222
  • 219
  • This is great! It solves the problem I asked, but perhaps not entirely how I was expecting. If I add another `Print this!` loop (or any number of loops because I'm expecting the be able to send messages to these threads as long as they live), only the first message to each thread prints. How can it be done to accept any number of messages? (If this is to much, I can ask a separate question and mark this as the answer of my original question) – Python Novice Sep 18 '14 at 05:35
  • @PythonNoob Just put a `while True:` loop around the `val = self.queue.get() ; self.do_thing_with_message(val)` calls. – dano Sep 18 '14 at 05:49
  • @PythonNoob I've edited my answer to demonstrate how to receive multiple messages, and also how you could signal the message receiving loop to end from the main thread, by putting a sentinel value (`None`) into the `Queue`. – dano Sep 18 '14 at 05:52