I'm writing a simple script that:
- Loads a big list of URLs
- Get the content of each URL making concurrent HTTP requests using requests' async module
- Parses the content of the page with lxml in order to check if a link is in the page
- If the link is present on the page, saves some info about the page in a ZODB database
When I test the script with 4 or 5 URLs It works well, I only have the following message when the script ends:
Exception KeyError: KeyError(45989520,) in <module 'threading' from '/usr/lib/python2.7/threading.pyc'> ignored
But when I try to check about 24000 URLs it fails toward the end of the list (when there are about 400 URLs left to check) with the following error:
Traceback (most recent call last):
File "check.py", line 95, in <module>
File "/home/alex/code/.virtualenvs/linka/local/lib/python2.7/site-packages/requests/async.py", line 83, in map
File "/home/alex/code/.virtualenvs/linka/local/lib/python2.7/site-packages/gevent-1.0b2-py2.7-linux-x86_64.egg/gevent/greenlet.py", line 405, in joinall
ImportError: No module named queue
Exception KeyError: KeyError(45989520,) in <module 'threading' from '/usr/lib/python2.7/threading.pyc'> ignored
I tried both with the version of gevent available on pypi and downloading and installing the latest version (1.0b2) from gevent repository.
I cannot understand why this happened, and why it happened only when I check a bunch of URLs. Any suggestions?
Here is the entire script:
from requests import async, defaults
from lxml import html
from urlparse import urlsplit
from gevent import monkey
from BeautifulSoup import UnicodeDammit
from ZODB.FileStorage import FileStorage
from ZODB.DB import DB
import transaction
import persistent
import random
storage = FileStorage('Data.fs')
db = DB(storage)
connection = db.open()
root = connection.root()
monkey.patch_all()
defaults.defaults['base_headers']['User-Agent'] = "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
defaults.defaults['max_retries'] = 10
def save_data(source, target, anchor):
root[source] = persistent.mapping.PersistentMapping(dict(target=target, anchor=anchor))
transaction.commit()
def decode_html(html_string):
converted = UnicodeDammit(html_string, isHTML=True)
if not converted.unicode:
raise UnicodeDecodeError(
"Failed to detect encoding, tried [%s]",
', '.join(converted.triedEncodings))
# print converted.originalEncoding
return converted.unicode
def find_link(html_doc, url):
decoded = decode_html(html_doc)
doc = html.document_fromstring(decoded.encode('utf-8'))
for element, attribute, link, pos in doc.iterlinks():
if attribute == "href" and link.startswith('http'):
netloc = urlsplit(link).netloc
if "example.org" in netloc:
return (url, link, element.text_content().strip())
else:
return False
def check(response):
if response.status_code == 200:
html_doc = response.content
result = find_link(html_doc, response.url)
if result:
source, target, anchor = result
# print "Source: %s" % source
# print "Target: %s" % target
# print "Anchor: %s" % anchor
# print
save_data(source, target, anchor)
global todo
todo = todo -1
print todo
def load_urls(fname):
with open(fname) as fh:
urls = set([url.strip() for url in fh.readlines()])
urls = list(urls)
random.shuffle(urls)
return urls
if __name__ == "__main__":
urls = load_urls('urls.txt')
rs = []
todo = len(urls)
print "Ready to analyze %s pages" % len(urls)
for url in urls:
rs.append(async.get(url, hooks=dict(response=check), timeout=10.0))
responses = async.map(rs, size=100)
print "DONE."