5

Hello! I am trying to write web crawler with python. I wanted to use python multithreading. Even after reading earlier suggested papers and tutorials, I still have problem. My code is here (whole source code is here):

class Crawler(threading.Thread):

    global g_URLsDict 
    varLock = threading.Lock()
    count = 0

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.url = self.queue.get()

    def run(self):
        while 1:
            print self.getName()+" started" 
            self.page = getPage(self.url)
            self.parsedPage = getParsedPage(self.page, fix=True)
            self.urls = getLinksFromParsedPage(self.parsedPage)

            for url in self.urls:

                self.fp = hashlib.sha1(url).hexdigest()

                #url-seen check
                Crawler.varLock.acquire() #lock for global variable g_URLs
                if self.fp in g_URLsDict:
                    Crawler.varLock.release() #releasing lock
                else:
                    #print url+" does not exist"
                    Crawler.count +=1
                    print "total links: %d"%len(g_URLsDict)
                    print self.fp
                    g_URLsDict[self.fp] = url
                    Crawler.varLock.release() #releasing lock
                    self.queue.put(url)

                    print self.getName()+ " %d"%self.queue.qsize()
                    self.queue.task_done()
            #self.queue.task_done()
        #self.queue.task_done()


print g_URLsDict
queue = Queue.Queue()
queue.put("http://www.ertir.com")

for i in range(5):
    t = Crawler(queue)
    t.setDaemon(True)
    t.start()

queue.join()

it does not work as needed, it does not give any result after thread 1 and it excutes differently some time gives this error:

Exception in thread Thread-2 (most likely raised during interpreter shutdown):

How can I fix it? And also I do not think this is more effective than just for loop.

I have tried to fix run():

def run(self):
    while 1:
        print self.getName()+" started" 
        self.page = getPage(self.url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()
                #self.queue.task_done()
        #self.queue.task_done()
    self.queue.task_done()

I experimented with task_done() command, in different places, can anyone explain difference?

torayeff
  • 9,296
  • 19
  • 69
  • 103
  • Is that first example missing some indentation? Looks like the class members should be pushed in by one level.. ? – Jon Cage May 29 '12 at 14:23
  • Can you post a working example? What modules do you import? – Jon Cage May 29 '12 at 14:35
  • http://snipt.org/ujhW9 working source code – torayeff May 29 '12 at 14:42
  • I'm confused. You create 5 threads that each read from the queue once. Then lots of urls get added to the queue from those first five pages, but they are never read? – Bittrance May 29 '12 at 14:46
  • what do you mean? they are added to the queue, but threads also read urls from that queue. – torayeff May 29 '12 at 14:51
  • I suggest not looping till the end of time, and picking a subset of the internet you would like to crawl. – mikerobi May 29 '12 at 14:53
  • No, urls are only read at `Crawler.__init__` – schlamar May 29 '12 at 14:55
  • Not really the point of the question, but couldn't you use multiprocessing.Manager().dict() to create a dict object that has internal locking? That would simplify the code a bit. – Bittrance May 29 '12 at 15:06
  • i am new to concurrency in python. as I understood, I just need to replace standard dictionary with Manager().dict(), so then I will not need to use locks??? – torayeff May 29 '12 at 15:08
  • Something like `g_URLsDict = Manager().dict()` will give you a dict-like object with internal locking. It's meant for multiple processes, but it should handle multiple threads as well. – Bittrance May 29 '12 at 15:14
  • I may be wrong tho, see http://stackoverflow.com/questions/2545961/how-to-synchronize-a-python-dict-with-multiprocessing – Bittrance May 29 '12 at 15:19

1 Answers1

4

You only call self.url = self.queue.get() when the threads initialise. You need to try and re-acquire urls from your queue inside your while loop if you want to pick up new urls for processing further down the line.

Try replacing self.page = getPage(self.url) with self.page = getPage(self.queue.get()). Be aware that the get function will block indefinitely. You probably want to timeout after a while and add some way for your background threads to exit gracefully by request (which would eliminate the Exception you saw).

There are some good examples on effbot.org which use get() in the way I've described above.

Edit - Answers to your initial comments:

Have a look at the docs for task_done(); For every call to get() (which doesn't timeout) you should call task_done() which tells any blocking calls to join() that everything on that queue is now processed. Each call to get() will block (sleep) while it waits for a new url to be posted on the queue.

Edit2 - Try this alternative run function:

def run(self):
    while 1:
        print self.getName()+" started"
        url = self.queue.get() # <-- note that we're blocking here to wait for a url from the queue
        self.page = getPage(url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                Crawler.count +=1
                print "total links: %d"%len(g_URLsDict)
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()

        self.queue.task_done() # <-- We've processed the url this thread pulled off the queue so indicate we're done with it.
Jon Cage
  • 36,366
  • 38
  • 137
  • 215
  • and how about task_done() command, where should i put it, and how it affects? is thread is sleeped and time is given to another thread when task_done() command is called? if so where is concurrency? I am confused. – torayeff May 29 '12 at 15:01
  • calling task_done() will just tell other threads that is Queue can be used? I mean, i get url from queue and then immediately call task_done(), and the second way I get url, process it, parse pages (at this time I want other threads to use Queue, because processing pages takes some time), then I call task_done(), what is the difference? Which one will be effective – torayeff May 29 '12 at 15:18
  • as I expected putting task_done() right after q.get() terminates all threads – torayeff May 29 '12 at 15:23
  • Please read the docs I linked to; task_done() is only telling queue.join() that one call to queue.put() has been dealt with. It's the calls to queue.get() which do the blocking you're interested in. Each time you call queue.put() one thread which was blocking on it's call to queue.get() will wake up and start processing... – Jon Cage May 29 '12 at 15:24
  • the point I can not understand is this: every created thread gets url from Queue, then it processes it, and after that calls task_done(), then where is the real multithreading efficiency, threads are just changing places, they do not anything in parallel, that can give speed ???? – torayeff May 29 '12 at 15:25
  • ...so each time queue.get() returns you want to call queue.task_done() once you're done processing that url. You want to call task_done() when you've finished processing that url (i.e. just before you go back around the while loop). – Jon Cage May 29 '12 at 15:25
  • If you started all the threads after adding 10 items (say) to the initial queue, you'd see them all fire off immediately. Since you're only currently pulling a single item off the queue and you've only put one on there, you'll pull one item off then call task_done() indicating that all threads can be rejoined. – Jon Cage May 29 '12 at 15:27
  • let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/11882/discussion-between-torayeff-and-jon-cage) – torayeff May 29 '12 at 15:27