I am just starting out trying to do multithreaded/multiprocessor stuff and am encountering a few problems. What I want to do is get generate a number of requests for data which should be downloaded from a remote database. These are stored in a Queue.Queue (let's call it the in_q). Once I've generated all the requests I start up a limited number of my thread class that takes the in_q and another Queue (out_q) as input. I then get() the jobs from q_in and output the results to q_out. So this part is IO-bound hence I thought threads would be a good choice. The results from q_out are consumed by a pool of processes which does some work on the processes. This part is CPU-bound hence I thought processes would be a good choice.
Now this seems to work ok, except that I've run into an odd behaviour which I''ve demonstrated below.
import threading
import Queue
import multiprocessing as mp
class TestThread(threading.Thread):
def __init__ ( self, threadnr,resultPool,jobPool ):
self.threadnr = threadnr
self.resultPool = resultPool
self.jobPool = jobPool
threading.Thread.__init__ ( self )
def run(self):
while True:
job = self.jobPool.get()
if job != None:
for a in range(10):
for i in xrange(1000000):
pass
print "Thread nr %d finished job %d" % (self.threadnr,job)
self.resultPool.put([self.threadnr,job+1])
self.jobPool.task_done()
def test(i):
print mp.current_process().name,"test",i
return mp.current_process().name,"test",i
if __name__ == '__main__':
q_in = Queue.Queue()
q_out = Queue.Queue()
nr_jobs = 20
res = []
nr_threads = 4
threads = []
for i in range(nr_jobs):
q_in.put(i)
for i in range(nr_threads):
t = TestThread(i,q_out,q_in)
t.start()
threads.append(t)
p_pool = mp.Pool(4)
for i in range(nr_jobs):
job = q_out.get(block=True)
print "Got job",job
res.append(p_pool.apply_async(test,(job,)))
p_pool.close()
p_pool.join()
for r in res:
print r.get()
for t in threads:
t.join()
The output of this is:
Thread nr 2 finished job 2
Got job [2, 3]
Thread nr 0 finished job 0
Got job [0, 1]
Thread nr 1 finished job 1
Got job [1, 2]
Thread nr 3 finished job 3
Got job [3, 4]
Thread nr 2 finished job 4
Got job Thread nr 0 finished job 5[
2, 5]
Got job [0, 6]
Thread nr 1 finished job 6
Got job [1, 7]
Thread nr 3 finished job 7
Got job [3, 8]
Thread nr 2 finished job 8
Got job [2, 9]
Thread nr 0 finished job 9
Got job [0, 10]
PoolWorker-4 test [1, 2]
PoolWorker-4 test [1, 7]
PoolWorker-3 test [3, 4]
PoolWorker-3 test [3, 8]
PoolWorker-2 test [0, 1]
PoolWorker-2 test [0, 6]
PoolWorker-2 test [0, 10]
PoolWorker-1 test [2, 3]
PoolWorker-1 test [2, 5]
PoolWorker-1 test [2, 9]
('PoolWorker-1', 'test', [2, 3])
('PoolWorker-2', 'test', [0, 1])
('PoolWorker-4', 'test', [1, 2])
('PoolWorker-3', 'test', [3, 4])
('PoolWorker-1', 'test', [2, 5])
('PoolWorker-2', 'test', [0, 6])
('PoolWorker-4', 'test', [1, 7])
('PoolWorker-3', 'test', [3, 8])
('PoolWorker-1', 'test', [2, 9])
('PoolWorker-2', 'test', [0, 10])
It's a testprogram that in large terms works like my real program. What I find odd is that even though the threads take relatively long to finish the processes doesn't print out until the threads have all done their job. It does seem like the jobs are consumed continuously, but the output from the processes doesn't appear until after all the threads are done.
In this example it's fairly harmless (if annoying), but in my real program this... queueing of output seems to cause a memory error as all the output from the processes is delayed until the last thread is done.
And as an addon question, is it even a good idea to mix threads and processes or should I stick to one or the other?
I would appreciate any thoughts on the matter.