In the following code, I have two queues for running different kinds of threads. These threads add to each other's queues recursively (Queue 1 grabs some info, Queue 2 processes it and adds more to Queue 1).
I want to wait until all items in both queues are fully processed. Currently I'm using this code
queue.join()
out_queue.join()
The problem is when the first queue temporarily runs out of stuff to do, it closes out, so it never sees what the queue 2 (the out_queue) adds to it after that point.
I added in the time.sleep() function which is a very hacked fix, by 30s both queues have filled up enough to not run out.
What is the standard Python way of fixing this? Do I have to have just one queue, and tag items in it as to which thread they should be handled by?
queue = Queue.Queue()
out_queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue, out_queue):
threading.Thread.__init__(self)
self.queue = queue
self.out_queue = out_queue
def run(self):
while True:
row = self.queue.get()
request = urllib2.Request(row[0], None, req_headers)
# ... some processing ...
self.out_queue.put([row, http_status, page])
self.queue.task_done()
class DatamineThread(threading.Thread):
def __init__(self, out_queue, mysql):
threading.Thread.__init__(self)
self.out_queue = out_queue
self.mysql = mysql
def run(self):
while True:
row = self.out_queue.get()
# ... some processing ...
queue.put(newrow)
self.out_queue.task_done()
queue = Queue.Queue()
out_queue = Queue.Queue()
for i in range(URL_THREAD_COUNT):
t = ThreadUrl(queue, out_queue)
t.setDaemon(True)
t.start()
#populate queue with data
for row in rows:
queue.put(row)
#MySQL Connector
mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db')
#spawn DatamineThread, if you have multiple, make sure each one has it's own mysql connector
dt = DatamineThread(out_queue, mysql)
dt.setDaemon(True)
dt.start()
time.sleep(30)
#wait on the queue until everything has been processed
queue.join()
out_queue.join()