There is no true concurrency in python except via the multiprocessing module since the GIL is then not part of the picture.
What you want to do requires an event loop in which you check an event queue and dispatch as appropriate. Pypubsub can likely make your life easier but might be overkill for what you want (as the author of pubsub I feel comfortable saying that :) Given how seamless integration of multiple processes is provided by mp module, is there really a reason not to use it if concurrency is really what you need?
The fact that you want an event to go from any thread to one or more threads indicates you could use shared post Queue that any thread can post to, which data indicating which event type and event data. Also you would have a message Queue for each thread: threads post to the shared post Queue, the main process event loop checks the post queue and copies events to the individual thread message Queues, as appropriate. Each thread has to check its queue regularly and process, removing processed event. Each thread could subscribe to main process for specific events.
Here is an example of 3 auxiliary threads that send messages to each other:
from multiprocessing import Process, Queue, Lock
from Queue import Empty as QueueEmpty
from random import randint
def log(lock, threadId, msg):
lock.acquire()
print 'Thread', threadId, ':', msg
lock.release()
def auxThread(id, lock, sendQueue, recvQueue, genType):
## Read from the queue
log(lock, id, 'starting')
while True:
# send a message (once in a while!)
if randint(1,10) > 7:
event = dict(type = genType, fromId = id, val = randint(1, 10) )
log(lock, id, 'putting message type "%(type)s" = %(val)s' % event)
sendQueue.put(event)
# block until we get a message:
maxWait = 1 # second
try:
msg = recvQueue.get(False, maxWait)
log(lock, id, 'got message type "%(type)s" = %(val)s from thread %(fromId)s' % msg)
if (msg['val'] == 'DONE'):
break
except QueueEmpty:
pass
log(lock, id, 'done')
def createThread(id, lock, postOffice, genType):
messagesForAux = Queue()
args = (id, lock, postOffice, messagesForAux, genType)
auxProc = Process(target=auxThread, args=args)
auxProc.daemon = True
return dict(q=messagesForAux, p=auxProc, id=id)
def mainThread():
postOffice = Queue() # where all threads post their messages
lock = Lock() # so print can be synchronized
# setup threads:
msgThreads = [
createThread(1, lock, postOffice, 'heartbeat'),
createThread(2, lock, postOffice, 'new_socket'),
createThread(3, lock, postOffice, 'keypress'),
]
# identify which threads listen for which messages
dispatch = dict(
heartbeat = (2,),
keypress = (1,),
new_socket = (3,),
)
# start all threads
for th in msgThreads:
th['p'].start()
# process messages
count = 0
while True:
try:
maxWait = 1 # second
msg = postOffice.get(False, maxWait)
for threadId in dispatch[msg['type']]:
thObj = msgThreads[threadId - 1]
thObj['q'].put(msg)
count += 1
if count > 20:
break
except QueueEmpty:
pass
log(lock, 0, "Main thread sending exit signal to aux threads")
for th in msgThreads:
th['q'].put(dict(type='command', val='DONE', fromId=0))
for th in msgThreads:
th['p'].join()
log(lock, th['id'], 'joined main')
log(lock, 0, "DONE")
if __name__ == '__main__':
mainThread()
You are entirely right that this description shares similarities with pypubsub functionality but you would be using only a small part of pypubsub, I think most of the complexity in your endeavor is the two types of queues, pypubsub wont help much for that pat of the problem. Once you have the queue system working using mp module (as per my example), you can bring in pypubsub and post/queue its messages rather than your own implantation of an event.