I want to put the generated result of the func in pool.apply_async()
method into a queue, everything seems very well but the error confused me a lot.
My purpose is try to make multiple asynchronized producers(maybe not correct here) and multiple consumers.
Here is my toy example:
from multiprocessing import Pool
import multiprocessing
from threading import Thread
from six.moves import xrange
pool = Pool(processes=2, maxtasksperchild=1000)
# resp_queue = multiprocessing.Queue(1000)
manager = multiprocessing.Manager()
resp_queue = manager.Queue()
rang = 10000
def fetch_page(url):
resp_queue.put(url)
def parse_response():
url = resp_queue.get()
print(url)
r_threads = []
def start_processing():
for i in range(2):
r_threads.append(Thread(target=parse_response))
print("start %s thread.." % i)
r_threads[-1].start()
urls = map(lambda x: "this is url %s" % x, xrange(rang))
for i in xrange(rang):
pool.apply_async(fetch_page, (urls[i],))
start_processing()
pool.close()
pool.join()
The error reads that:
> Process PoolWorker-1: Process PoolWorker-2: Traceback (most recent
> call last): Traceback (most recent call last): File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
> self.run()
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run File
> "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> self._target(*self._args, **self._kwargs)
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> task = get()
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
> File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
> return recv()
> return recv() AttributeError: 'module' object has no attribute 'fetch_page' AttributeError: 'module' object has no attribute
> 'fetch_page' start 0 thread.. start 1 thread..
I have read this answer but found it very strange, and this answer doesn't work on my Ubuntu machine.
Any suggestions are highly appreciated. Thanks very much.