1

I'm pretty new to python (I mainly write code in Java). I have a python script that's essentially a crawler. It calls phantomjs, which loads up the page, returns its source, and a list of urls that it found in the page.

I've been trying to use Python 3's multiprocessing module to do this, but I can't figure out how to use a shared queue that workers can also add to. I keep getting unpredictable results.

My previous approach used a global list of URLs, out of which I extracted a chunk and sent to workers using map_async. At the end, I would gather all the returned URLs and append them to the global list. The problem is that each "chunk" takes as long as the slowest worker. I'm trying to modify it so that whenever worker is done, it can pick up the next URL. However, I don't think I'm doing it correctly. Here's what I have so far:

def worker(url, urls):
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
    returned_urls = phantomjs(url)
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")

    for returned_url in returned_urls:
        urls.put(returned_url, block=True)

    print("There are " + str(urls.qsize()) + " URLs in total.\n")

if __name__ == '__main__':    
    manager = multiprocessing.Manager()
    urls = manager.Queue()
    urls.append(<some-url>)

    pool = Pool()
    while True:
        url = urls.get(block=True)
        pool.apply_async(worker, (url, urls))

    pool.close()
    pool.join()

If there is a better way to do this, please let me know. I'm crawling a known site, and the eventual terminating condition is when there are no URLs to process. But right now it looks like I will just keep running for ever. I'm not sure if I would use queue.empty() because it does say that it's not reliable.

Vivin Paliath
  • 94,126
  • 40
  • 223
  • 295
  • See related: http://stackoverflow.com/questions/17241663/filling-a-queue-and-managing-multiprocessing-in-python (*I don't think your design pattern is quite right*) I believe you want N workers accessing a shared queue cooperatively. – James Mills Jun 15 '15 at 04:20
  • @JamesMills That example makes a lot of sense! Is it alright to add to the queue within `worker_main`? – Vivin Paliath Jun 15 '15 at 04:24
  • Also, I tried that out and it looks like it exits almost immediately, even with the `time.sleep(10)`. The phantomjs call takes some time to return, but the script exits before that. – Vivin Paliath Jun 15 '15 at 04:32
  • Well I mean; I'm not sure how to answer your question anymore :) Haha! – James Mills Jun 15 '15 at 04:36
  • Haha, no worries. Thanks :) I will experiment some more and update my question. – Vivin Paliath Jun 15 '15 at 04:38
  • I wouldn't use the code in that SO link, it abuses the initializer functionality of the Pool class to create a pool of workers that can't work with the normal pool functions. They can get the same result by just creating processes with ```Process()```. The code you posted in your Q is actually a fine way of doing it, you are just missing an end condition. Try to figure out how you can know if you are done searching (counting urls vs no links?). You can then unblock the queue by passing a sentinal value. – bj0 Jun 15 '15 at 17:17
  • Use celery. It implements this. – Marcin Jun 16 '15 at 19:51
  • @bj0 Thanks for your comment! I actually tried to find a description of that pattern that was posted in that other question. I looked up the documentation and saw what was being passed in was used as the initializer function. I came to the same conclusion as you did that it was an abuse of functionality, so I tried doing it in a manner similar to my question. – Vivin Paliath Jun 16 '15 at 19:58
  • @Marcin I will take a look into that. I have used python before but not extensively, so I am not familiar with all the frameworks. – Vivin Paliath Jun 16 '15 at 19:59

2 Answers2

0

This is how I solved the problem. I originally went with the design posted in this answer but bj0 mentioned that it was abusing the initializer function. So I decided to do it using apply_async, in a fashion similar to the code posted in my question.

Since my workers modify the queue they are reading URLs from (they add to it), I thought that I could simply run my loop like so:

while not urls.empty():
   pool.apply_async(worker, (urls.get(), urls))

I expected that this would work since the workers will add to the queue, and apply_async would wait if all workers were busy. This didn't work as I expected and the loop terminated early. The problem was that it wasn't clear that apply_async does not block if all workers are busy. Instead, it will queue up submitted tasks, which means that urls will eventually become empty and the loop will terminate. The only time the loop blocks is if the queue is empty when you try to execute urls.get(). At this point, it will wait for more items to become available in the queue. But I still needed to figure out a way the terminate the loop. The condition is that the loop should terminate when none of the workers return new URLs. To do this, I use a shared dict that sets a value associated with the process-name to 0 if the process didn't return any URLs, and 1 otherwise. I check the sum of the keys during every iteration of the loop, and if it is ever 0 I know that I am done.

The basic structure ended up being like this:

def worker(url, url_queue, proc_user_urls_queue, proc_empty_urls_queue):

    returned_urls = phantomjs(url) # calls phantomjs and waits for output
    if len(returned_urls) > 0:
        proc_empty_urls_queue.put(
            [multiprocessing.current_process().name, 1]
        )
    else:
        proc_empty_urls_queue.put(
            [multiprocessing.current_process().name, 0]
        )

    for returned_url in returned_urls:
        url_queue.put(returned_url)

def empty_url_tallier(proc_empty_urls_queue, proc_empty_urls_dict):
    while 1:
        # This may not be necessary. I don't know if this worker is run
        # by the same process every time. If not, it is possible that
        # the worker was assigned the task of fetching URLs, and returned
        # some. So let's make sure that we set its entry to zero anyway.
        # If this worker is run by the same process every time, then this
        # stuff is not necessary.
        id = multiprocessing.current_process().name
        proc_empty_urls_dict[id] = 0

        proc_empty_urls = proc_empty_urls_queue.get()
        if proc_empty_urls == "done": # poison pill
            break

        proc_id = proc_empty_urls[0]
        proc_empty_url = proc_empty_urls[1]
        proc_empty_urls_dict[proc_id] = proc_empty_url

manager = Manager()

urls = manager.Queue()
proc_empty_urls_queue = manager.Queue()
proc_empty_urls_dict = manager.dict()

pool = Pool(33)

pool.apply_async(writer, (proc_user_urls_queue,))
pool.apply_async(empty_url_tallier, (proc_empty_urls_queue, proc_empty_urls_dict))

# Run the first apply synchronously 
urls.put("<some-url>")
pool.apply(worker, (urls.get(), urls, proc_empty_urls_queue))
while sum(proc_empty_urls_dict.values()) > 0:
    pool.apply_async(worker, (urls.get(), urls, proc_empty_urls_queue))

proc_empty_urls_queue.put("done") # poison pill
pool.close()
pool.join()
Community
  • 1
  • 1
Vivin Paliath
  • 94,126
  • 40
  • 223
  • 295
  • That's an interesting approach, but I believe it is susceptible to a race condition. If a no-url worker ends while another work (who previoulsy had no-url) is processing phantomjs, the while loop could end before that worker finishes (with potential new urls). Also, you are using a shared dict, why do you need to use a second queue? You should be able to pass the dict to the workers and let them use it directly. – bj0 Jun 16 '15 at 22:15
  • The easiest way to eliminate the race condition (and get rid of the dict) is to count urls. Each time urls.get() gives you a url, increment a counter. Each time you get a 'no-url' sentinal value, decrement it. When the counter reaches 0, you should be out of urls. This of course assumes no circular references, which would cause problems... – bj0 Jun 16 '15 at 22:17
  • @bj0 Good point. I figured there would be some sort of race condition. I used the shared dict because, to be honest, I'm still a little hazy on how multiprocessing is done in python and I wasn't sure if it was ok to modify dict properties like that directly on the worker. – Vivin Paliath Jun 17 '15 at 00:31
  • @bj0 I like the counter approach - I was going down that route initially but I wasn't sure about atomic adds etc. Can I use `Value` from `Manager` for that? A lot of my confusion stems from not being completely familiar with how multiprocessing works in Python and just Python in general. I work in Java most of the time! Thank you for helpful comments. I realize my code is probably not very clean by Python standards and I'm probably violating many best practices. :) – Vivin Paliath Jun 17 '15 at 00:34
  • the objects from manager (like dict) are made so that they can be accessed directly by workers (they internally handle proxy/locking between processes). If you handle the counter in the main process (in the while loop, check the return value of urls.get() then adjust the counter), you can just use a normal integer and not worry about concurrent access. I can throw up an answer if that doesn't make sense. – bj0 Jun 17 '15 at 16:48
  • @bj0 So it would have to be done in the main process? It shouldn't be modified in the worker process? I'm using `manager.Value()`. I saw an example where it was being modified in the worker process and so in my code I simply do `counter.value += (len(returned_urls) - 1)`. – Vivin Paliath Jun 17 '15 at 16:51
  • @bj0 Thanks! I'll take a look. – Vivin Paliath Jun 17 '15 at 19:19
0

Here is what I would probably do:

def worker(url, urls):
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
    returned_urls = phantomjs(url)
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")

      for returned_url in returned_urls:
          urls.put(returned_url, block=True)

      # signal finished processing this url
      urls.put('no-url')

    print("There are " + str(urls.qsize()) + " URLs in total.\n")

if __name__ == '__main__':    
    manager = multiprocessing.Manager()
    pool = Pool()
    urls = manager.Queue()

    # start first url before entering loop
    counter = 1
    pool.apply_async(worker, (<some-url>, urls))

    while counter > 0:
        url = urls.get(block=True)
        if url == 'no-url':
            # a url has finished processing
            counter -= 1
        else:
            # a new url needs to be processed
            counter += 1
            pool.apply_async(worker, (url, urls))

    pool.close()
    pool.join()

Whenever a url is popped off the queue, increment the counter. Think of it as a "currently processing url" counter. When a 'no-url' is popped off the queue, a "currently processing url" has finished, so decrement the counter. As long as the counter is greater than 0, there are urls that haven't finished processing and returned 'no-url' yet.

EDIT

As I said in the comment (put here for anyone else who reads it), when using a multiprocessing.Pool, instead of thinking of it as individual processes, it's best to think of it as a single construct that executes your function each time it gets data (concurrently when possible). This is most useful for data-driven problems where you don't track or care about individual worker processes only the data being processed.

bj0
  • 7,893
  • 5
  • 38
  • 49
  • Ah, I see what you're doing. So I think I can adapt this to my case. For me, it's not so much that I want to know when a URL has finished processing, but that I want to know when a worker doesn't return *any* URLs. Hence the terminating condition is when *all* workers finish their tasks and *none* of them returned URLs. – Vivin Paliath Jun 17 '15 at 19:24
  • I thought about that first, but it's more challenging because with a pool of workers, each worker *process* executes ```worker``` over and over, sitting idle between executions. So you could have a situation where each worker *process* finishes it's current execution of ```worker``` and gets no urls, but there still be urls backed up in the queue waiting to be passed to the next idle worker. The only way to know *for sure* that there is no more work to be done is to know that all received urls have finished processing. – bj0 Jun 17 '15 at 21:37
  • It helps to think of a pool not as individual workers, but a single entity that runs a function over and over. Pools tend to be used in more data-driven problems, where you don't track or care about the number of worker processes. – bj0 Jun 17 '15 at 21:38
  • True, in my particular case I do need to know what each instance of the worker returns as the whole pool is operating. That's why I thought that I could try modifying value in the worker itself. I have it running at home and I'll take a look at it when I get back from work to see how it did. I like your idea of using sentinel values, so I'll do the same thing. – Vivin Paliath Jun 17 '15 at 21:42