0

I am just starting out trying to do multithreaded/multiprocessor stuff and am encountering a few problems. What I want to do is get generate a number of requests for data which should be downloaded from a remote database. These are stored in a Queue.Queue (let's call it the in_q). Once I've generated all the requests I start up a limited number of my thread class that takes the in_q and another Queue (out_q) as input. I then get() the jobs from q_in and output the results to q_out. So this part is IO-bound hence I thought threads would be a good choice. The results from q_out are consumed by a pool of processes which does some work on the processes. This part is CPU-bound hence I thought processes would be a good choice.

Now this seems to work ok, except that I've run into an odd behaviour which I''ve demonstrated below.

import threading
import Queue
import multiprocessing as mp

class TestThread(threading.Thread):

    def __init__ ( self, threadnr,resultPool,jobPool ):
      self.threadnr = threadnr
      self.resultPool = resultPool
      self.jobPool = jobPool
      threading.Thread.__init__ ( self )    

    def run(self):
        while True:
            job = self.jobPool.get()
            if job != None:
                for a in range(10):
                    for i in xrange(1000000):
                        pass
                print "Thread nr %d finished job %d" % (self.threadnr,job)
                self.resultPool.put([self.threadnr,job+1])
                self.jobPool.task_done()           

def test(i):
    print mp.current_process().name,"test",i
    return mp.current_process().name,"test",i

if __name__ == '__main__':        
    q_in = Queue.Queue()   
    q_out = Queue.Queue() 
    nr_jobs = 20
    res = []
    nr_threads = 4
    threads = []

    for i in range(nr_jobs):
        q_in.put(i)

    for i in range(nr_threads):
        t = TestThread(i,q_out,q_in)
        t.start()
        threads.append(t)

    p_pool = mp.Pool(4)   

    for i in range(nr_jobs):
        job = q_out.get(block=True)
        print "Got job",job
        res.append(p_pool.apply_async(test,(job,)))

    p_pool.close()
    p_pool.join()

    for r in res:
        print r.get()

    for t in threads:
        t.join()

The output of this is:

Thread nr 2 finished job 2
Got job [2, 3]
Thread nr 0 finished job 0
Got job [0, 1]
Thread nr 1 finished job 1
Got job [1, 2]
Thread nr 3 finished job 3
Got job [3, 4]
Thread nr 2 finished job 4
Got job Thread nr 0 finished job 5[
2, 5]
Got job [0, 6]
Thread nr 1 finished job 6
Got job [1, 7]
Thread nr 3 finished job 7
Got job [3, 8]
Thread nr 2 finished job 8
Got job [2, 9]
Thread nr 0 finished job 9
Got job [0, 10]
PoolWorker-4 test [1, 2]
PoolWorker-4 test [1, 7]
PoolWorker-3 test [3, 4]
PoolWorker-3 test [3, 8]
PoolWorker-2 test [0, 1]
PoolWorker-2 test [0, 6]
PoolWorker-2 test [0, 10]
PoolWorker-1 test [2, 3]
PoolWorker-1 test [2, 5]
PoolWorker-1 test [2, 9]
('PoolWorker-1', 'test', [2, 3])
('PoolWorker-2', 'test', [0, 1])
('PoolWorker-4', 'test', [1, 2])
('PoolWorker-3', 'test', [3, 4])
('PoolWorker-1', 'test', [2, 5])
('PoolWorker-2', 'test', [0, 6])
('PoolWorker-4', 'test', [1, 7])
('PoolWorker-3', 'test', [3, 8])
('PoolWorker-1', 'test', [2, 9])
('PoolWorker-2', 'test', [0, 10])

It's a testprogram that in large terms works like my real program. What I find odd is that even though the threads take relatively long to finish the processes doesn't print out until the threads have all done their job. It does seem like the jobs are consumed continuously, but the output from the processes doesn't appear until after all the threads are done.

In this example it's fairly harmless (if annoying), but in my real program this... queueing of output seems to cause a memory error as all the output from the processes is delayed until the last thread is done.

And as an addon question, is it even a good idea to mix threads and processes or should I stick to one or the other?

I would appreciate any thoughts on the matter.

JPJ
  • 1
  • On what platform are you? Your example runs correctly under both Ubuntu Linux and Windows 7. As a side note your Threads are never ended due to the `while True`. – Wessie Sep 27 '12 at 15:14
  • It runs correctly on my Linux machine (Ubuntu 10.04 64 bit, python 2.6.5). After while true do a try except block where the try block gets an item from the queue. The except block should break the loop. If the queue is empty it will throw an exception and the loop will end. – b10hazard Sep 27 '12 at 15:24
  • Nice idea with try except. I do have a method to stop the thread in my real program though, but doing it like that would be a better way of stopping them I believe. I'm running it on 2.7.2 on a Win7 32-bit machine. – JPJ Sep 27 '12 at 17:23
  • I've tried adding a sys.stdout.flush() after the print statement in the test() function and this gives the interleaved output that I expected. It seems a bit odd that this is needed though, or am I missing something? – JPJ Sep 27 '12 at 18:00
  • A simplified version of this question has since been answered [here on stack overflow](http://stackoverflow.com/questions/18234469/python-multithreaded-print-statements-delayed-until-all-threads-complete-executi) – Hawkwing Aug 14 '13 at 14:56

0 Answers0