0

I want to put the generated result of the func in pool.apply_async() method into a queue, everything seems very well but the error confused me a lot.

My purpose is try to make multiple asynchronized producers(maybe not correct here) and multiple consumers.

Here is my toy example:

from multiprocessing import Pool
import multiprocessing
from threading import Thread

from six.moves import xrange
pool = Pool(processes=2, maxtasksperchild=1000)


# resp_queue = multiprocessing.Queue(1000)
manager = multiprocessing.Manager()
resp_queue = manager.Queue()

rang = 10000


def fetch_page(url):
    resp_queue.put(url)


def parse_response():
    url = resp_queue.get()
    print(url)

r_threads = []


def start_processing():
    for i in range(2):
        r_threads.append(Thread(target=parse_response))
        print("start %s thread.." % i)
        r_threads[-1].start()


urls = map(lambda x: "this is url %s" % x, xrange(rang))
for i in xrange(rang):
    pool.apply_async(fetch_page, (urls[i],))

start_processing()

pool.close()
pool.join()

The error reads that:

> Process PoolWorker-1: Process PoolWorker-2: Traceback (most recent
> call last): Traceback (most recent call last):   File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap   File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
>     self.run()
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run   File
> "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self._target(*self._args, **self._kwargs)
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker  
> File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
>     task = get()
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get  
> File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
>     return recv()
>     return recv() AttributeError: 'module' object has no attribute 'fetch_page' AttributeError: 'module' object has no attribute
> 'fetch_page' start 0 thread.. start 1 thread..

I have read this answer but found it very strange, and this answer doesn't work on my Ubuntu machine.

Any suggestions are highly appreciated. Thanks very much.

Lerner Zhang
  • 6,184
  • 2
  • 49
  • 66
  • 2
    define your work function prior declaring the Pool. – georgexsh Dec 04 '17 at 18:19
  • @scriptboy and georgexsh You are right! I know from [this answer](https://stackoverflow.com/a/18947948/3552975) that "when you create a pool the workers are created by forking the current process". – Lerner Zhang Dec 05 '17 at 01:43

1 Answers1

1

Have a look at the code below. Changes I made to your version:

  • I'm using map instead of apply as it gets an iterable and splits work between workers nicely.
  • I've added a while loop to your parse_resp function (now get_url) so each thread will get values from queue to exhaustion.
  • Pool instantiation & calling is after __name__ == '__main__' which is a windows hack needed for Python multiprocessing (as much as I know, might be wrong I'm on Ubuntu).

from multiprocessing import Pool import multiprocessing from threading import Thread

manager = multiprocessing.Manager()
url_queue = manager.Queue()

rang = 10000


def put_url(url):
    url_queue.put(url)


def get_url(thread_id):
    while not url_queue.empty():
        print('Thread {0} got url {1}'.format(str(thread_id), url_queue.get()))


r_threads = []


def start_threading():
    for i in range(2):
        r_threads.append(Thread(target=get_url, args=(i,)))
        print("start %s thread.." % i)
        r_threads[-1].start()
    for i in r_threads:
        i.join()


urls = ["url %s" % x for x in range(rang)]


if __name__ == '__main__':
    pool = Pool(processes=2, maxtasksperchild=1000)
    pool.map_async(put_url, urls)
    start_threading()
    pool.close()
    pool.join()

Prints:

start 0 thread..
start 1 thread..
Thread 0 got url 0
Thread 0 got url 1
Thread 1 got url 2
Thread 0 got url 3
Thread 0 got url 4
Thread 1 got url 5
Thread 0 got url 6

Evya
  • 2,325
  • 3
  • 11
  • 22