My first idea was that you get errors due to overload on the DNS - maybe your resolver just doesn't allow you to do more than a certain amount of queries per time.
Besides, I spotted some issues:
You forgot to assign site
correctly in the while
loop - which would probably better be replaced by a for
loop iterating over the queue, or something. In your version, you use the site
variable from the module level namespace, which can lead to queries made double and others skipped.
In this place, you have control over if the queue still has entries or awaits some. If both not, you can quit your thread.
For security reasons, you would better do
def mexec(befehl, args=None):
cur = conn.cursor()
cur.execute(befehl, args)
in order to do afterwards
mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb
In order to stay compatible with future protocols, you should use socket.getaddrinfo()
instead of socket.gethostbyname_ex(site)
. There you get all IPs you want (at first, you can limit to IPv4, but switching to IPv6 is easier then) and can maybe put them all into the DB.
For your queue, code samples could be
def queue_iterator(q):
"""Iterate over the contents of a queue. Waits for new elements as long as the queue is still filling."""
while True:
try:
item = q.get(block=q.is_filling, timeout=.1)
yield item
q.task_done() # indicate that task is done.
except Empty:
# If q is still filling, continue.
# If q is empty and not filling any longer, return.
if not q.is_filling: return
def getips(i, q):
for site in queue_iterator(q):
#--resolve IP--
try:
result = socket.gethostbyname_ex(site)
print(result)
mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb
except (socket.gaierror):
print("no ip")
mexec("UPDATE sites2block SET ip='no ip', updated='yes',")
# Indicate it is filling.
q.is_filling = True
#Spawn thread pool
for i in range(num_threads):
worker = Thread(target=getips, args=(i, queue))
worker.setDaemon(True)
worker.start()
#Place work in queue
for site in websites:
queue.put(site)
queue.is_filling = False # we are done filling, if q becomes empty, we are done.
#Wait until worker threads are done to exit
queue.join()
should do the trick.
Another issue is your parallel inserting into MySQL. You are only allowed to do one MySQL query at a time. So you could either protect the access via threading.Lock()
or RLock()
, or you could put the answers into another queue which is processed by another thread, which could even bundle them.