5

I am completely new to multiprocessing. I have been reading documentation about multiprocessing module. I read about Pool, Threads, Queues etc. but I am completely lost.

What I want to do with multiprocessing is that, convert my humble http downloader, to work with multiple workers. What I am doing at the moment is, download a page, parse to page to get interesting links. Continue until all interesting links are downloaded. Now, I want to implement this with multiprocessing. But I have no idea at the moment, how to organize this work flow. I had two thoughts about this. Firstly, I thought about having two queues. One queue for links that needs to be downloaded, other for links to be parsed. One worker, downloads the pages, and adds them to queue which is for items that needs to be parsed. And other process parses a page, and adds the links it finds interesting to the other queue. Problems I expect from this approach are; first of all, why download one page at a time and parse a page at a time. Moreover, how do one process know that there are items to be added to queue later, after it exhausted all items from queue.

Another approach I thought about using is that. Have a function, that can be called with an url as an argument. This function downloads the document and starts parsing it for the links. Every time it encounters an interesting link, it instantly creates a new thread running identical function as itself. The problem I have with this approach is, how do I keep track of all the processes spawned all around, how do I know if there is still processes to running. And also, how do I limit maximum number of processes.

So I am completely lost. Can anyone suggest a good strategy, and perhaps show some example codes about how to go with the idea.

BenMorel
  • 34,448
  • 50
  • 182
  • 322
yasar
  • 13,158
  • 28
  • 95
  • 160
  • This has been discussed in some depth [previously](http://stackoverflow.com/questions/731993/multiprocessing-or-multithreading) – brc Sep 24 '11 at 19:50
  • may I suggest you look at the eventlet library? You may find it suits your purposes better then using multiprocessing. – Winston Ewert Sep 25 '11 at 03:01

2 Answers2

4

Here is one approach, using multiprocessing. (Many thanks to @Voo, for suggesting many improvements to the code).

import multiprocessing as mp
import logging
import Queue
import time

logger=mp.log_to_stderr(logging.DEBUG)  # or, 
# logger=mp.log_to_stderr(logging.WARN) # uncomment this to silence debug and info messages

def worker(url_queue,seen):
    while True:
        url=url_queue.get()
        if url not in seen:
            logger.info('downloading {u}'.format(u=url))
            seen[url]=True
            # Replace this with code to dowload url
            # urllib2.open(...)
            time.sleep(0.5)
            content=url
            logger.debug('parsing {c}'.format(c=content))
            # replace this with code that finds interesting links and
            # puts them in url_queue
            for i in range(3):
                if content<5:
                    u=2*content+i-1
                    logger.debug('adding {u} to url_queue'.format(u=u))
                    time.sleep(0.5)
                    url_queue.put(u)
        else:
            logger.debug('skipping {u}; seen before'.format(u=url))
        url_queue.task_done()

if __name__=='__main__':
    num_workers=4
    url_queue=mp.JoinableQueue()
    manager=mp.Manager()
    seen=manager.dict()

    # prime the url queue with at least one url
    url_queue.put(1)
    downloaders=[mp.Process(target=worker,args=(url_queue,seen))
                 for i in range(num_workers)]
    for p in downloaders:
        p.daemon=True
        p.start()
    url_queue.join()
  • A pool of (4) worker processes are created.
  • There is a JoinableQueue, called url_queue.
  • Each worker gets a url from the url_queue, finds new urls and adds them to the url_queue.
  • Only after adding new items does it call url_queue.task_done().
  • The main process calls url_queue.join(). This blocks the main process until task_done has been called for every task in the url_queue.
  • Since the worker processes have the daemon attribute set to True, they too end when the main process ends.

All the components used in this example are also explained in Doug Hellman's excellent Python Module of the Week tutorial on multiprocessing.

unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • I'd personally go without the daemon setting and end the processes regularly, but if you do, you really have to join with the parsers and not the downloaders, because otherwise it's extremely likely that some file will be downloaded but never parsed. And that obviously makes it a bit more complex - so I'd just add a sentinel value to the queues so that the processes know when no data is coming any more. It's probably better from a load balancing pov (and makes the logic simpler) to just have one queue/processor pool. – Voo Sep 24 '11 at 20:37
  • @Voo: how would you know when to add a sentinel to the queue? – unutbu Sep 24 '11 at 21:30
  • My solution to this: Use `JoinableQueue`. Main thread creates process pool, puts starting tasks into queue, starts process and joins the queue. The loop for every working process: get job, if job is a sentinel, break. Otherwise execute job, put new jobs into queue, etc. Finally call `task_done` and repeat. Main thread is blocked until all jobs are finished, when it unblocks it puts nrProcesses Sentinels into the queue and we're done. – Voo Sep 24 '11 at 22:53
  • How does while True statement breaks out of loop? am I missing something here? And do all of workers share same seen list? – yasar Sep 25 '11 at 13:22
  • @yasar11732: Each time an item is put in `url_queue` a counter is incremented. Every time `url_queue.task_done()` is called, the counter is decremented. When the `counter == 0`, `url_queue.join()` stops blocking the main process. When the main process ends, the `workers` are terminated for you because each process has the `daemon` attribute set to `True`. `seen` is a shared dict -- all workers see the same values. – unutbu Sep 25 '11 at 13:41
2

What you're describing is essentially graph traversal; Most graph traversal algorithms (That are more sophisticated than depth first), keep track of two sets of nodes, in your case, the nodes are url's.

The first set is called the "closed set", and represents all of the nodes that have already been visited and processed. If, while you're processing a page, you find a link that happens to be in the closed set, you can ignore it, it's already been handled.

The second set is unsurprisingly called the "open set", and includes all of the edges that have been found, but not yet processed.

The basic mechanism is to start by putting the root node into the open set (the closed set is initially empty, no nodes have been processed yet), and start working. Each worker takes a single node from the open set, copies it to the closed set, processes the node, and adds any nodes it discovers back to the open set (so long as they aren't already in either the open or closed sets). Once the open set is empty, (and no workers are still processing nodes) the graph has been completely traversed.

Actually implementing this in multiprocessing probably means that you'll have a master task that keeps track of the open and closed sets; If a worker in a worker pool indicates that it is ready for work, the master worker takes care of moving the node from the open set to the closed set and starting up the worker. the workers can then pass all of the nodes they find, without worrying about if they are already closed, back to the master; and the master will ignore nodes that are already closed.

SingleNegationElimination
  • 151,563
  • 33
  • 264
  • 304