1

I programmed a script that should resolve multiple hostnames into ip addresses using Multithreading.

However, it fails and freezes at some random point. How can this be solved?

num_threads = 100
conn = pymysql.connect(host='xx.xx.xx.xx', unix_socket='/tmp/mysql.sock', user='user', passwd='pw', db='database')
cur = conn.cursor()
def mexec(befehl):
    cur = conn.cursor()
    cur.execute(befehl)

websites = ['facebook.com','facebook.org' ... ... ... ...] \#10.000 websites in array
queue = Queue()
def getips(i, q):
    while True:
        #--resolve IP--
        try:
            result = socket.gethostbyname_ex(site)
            print(result)
            mexec("UPDATE sites2block SET ip='"+result+"', updated='yes' ") #puts site in mysqldb
        except (socket.gaierror):
            print("no ip")
            mexec("UPDATE sites2block SET ip='no ip', updated='yes',")
        q.task_done()
#Spawn thread pool
for i in range(num_threads):
    worker = Thread(target=getips, args=(i, queue))
    worker.setDaemon(True)
    worker.start()
#Place work in queue
for site in websites:
    queue.put(site)
#Wait until worker threads are done to exit
queue.join()
glglgl
  • 89,107
  • 13
  • 149
  • 217
user670186
  • 2,588
  • 6
  • 37
  • 55
  • sry, forgot to accept! I dont get specific errors, the script runs and at some point just freezes without showing any specific errors. Then I have to kill the shell. – user670186 Feb 08 '12 at 14:10
  • The example is incomplete - what is `mexec´? – Jean-Paul Calderone Feb 08 '12 at 14:15
  • Of course you don't get specific errors - you fail to evaluate the exception. Just change to `except socket.gaierror, e:` and add a `print e, repr(e)` or something. The deadlock, however, seems to result from failure to check if the queue is exhausted. In this case, you should quit the thread. – glglgl Feb 08 '12 at 14:37

3 Answers3

3

You could use a sentinel value to signal threads that there is no work and join the threads instead of queue.task_done() and queue.join():

#!/usr/bin/env python
import socket
from Queue import Queue
from threading import Thread

def getips(queue):
    for site in iter(queue.get, None):
        try: # resolve hostname
            result = socket.gethostbyname_ex(site)
        except IOError, e:
            print("error %s reason: %s" % (site, e))
        else:
            print("done %s %s" % (site, result))

def main():
    websites = "youtube google non-existent.example facebook yahoo live".split()
    websites = [name+'.com' for name in websites]

    # Spawn thread pool
    queue = Queue()
    threads = [Thread(target=getips, args=(queue,)) for _ in range(20)]
    for t in threads:
        t.daemon = True
        t.start()

    # Place work in queue
    for site in websites: queue.put(site)
    # Put sentinel to signal the end
    for _ in threads: queue.put(None)
    # Wait for completion
    for t in threads: t.join()

main()

gethostbyname_ex() function is obsolete. To support both IPv4/v6 addresses you could use socket.getaddrinfo() instead.

jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • 1
    Python 2.x uses `from Queue import Queue`. Python 3.x - `from queue import Queue` to comply with [pep-8](http://www.python.org/dev/peps/pep-0008/) module naming convention. To avoid confusion you could use tag `python-3.x` when you ask question about `python-3.x`. – jfs Feb 11 '12 at 00:53
1

My first idea was that you get errors due to overload on the DNS - maybe your resolver just doesn't allow you to do more than a certain amount of queries per time.


Besides, I spotted some issues:

  1. You forgot to assign site correctly in the while loop - which would probably better be replaced by a for loop iterating over the queue, or something. In your version, you use the site variable from the module level namespace, which can lead to queries made double and others skipped.

    In this place, you have control over if the queue still has entries or awaits some. If both not, you can quit your thread.

  2. For security reasons, you would better do

    def mexec(befehl, args=None):
        cur = conn.cursor()
        cur.execute(befehl, args)
    

    in order to do afterwards

    mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb
    

In order to stay compatible with future protocols, you should use socket.getaddrinfo() instead of socket.gethostbyname_ex(site). There you get all IPs you want (at first, you can limit to IPv4, but switching to IPv6 is easier then) and can maybe put them all into the DB.


For your queue, code samples could be

def queue_iterator(q):
    """Iterate over the contents of a queue. Waits for new elements as long as the queue is still filling."""
    while True:
        try:
            item = q.get(block=q.is_filling, timeout=.1)
            yield item
            q.task_done() # indicate that task is done.
        except Empty:
            # If q is still filling, continue.
            # If q is empty and not filling any longer, return.
            if not q.is_filling: return

def getips(i, q):
    for site in queue_iterator(q):
        #--resolve IP--
        try:
            result = socket.gethostbyname_ex(site)
            print(result)
            mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb
        except (socket.gaierror):
            print("no ip")
            mexec("UPDATE sites2block SET ip='no ip', updated='yes',")
# Indicate it is filling.
q.is_filling = True
#Spawn thread pool
for i in range(num_threads):
    worker = Thread(target=getips, args=(i, queue))
    worker.setDaemon(True)
    worker.start()
#Place work in queue
for site in websites:
    queue.put(site)
queue.is_filling = False # we are done filling, if q becomes empty, we are done.
#Wait until worker threads are done to exit
queue.join()

should do the trick.


Another issue is your parallel inserting into MySQL. You are only allowed to do one MySQL query at a time. So you could either protect the access via threading.Lock() or RLock(), or you could put the answers into another queue which is processed by another thread, which could even bundle them.

glglgl
  • 89,107
  • 13
  • 149
  • 217
  • Hi, thanks! For 1. could you post a corrected code, I just dont get it to work... – user670186 Feb 08 '12 at 14:47
  • @user670186 Done. Just corrected this; the other stuff is not integrated. – glglgl Feb 08 '12 at 15:29
  • it might be simpler to use blocking `iter(q.get, None)` and a sentinel: `for i in range(num_threads): q.put(None)` and join the threads instead of unreliable `q.task_done(), q.join(), q.is_filling` – jfs Feb 08 '12 at 19:47
  • I have thought about a sentinel as well, but in this case you need several ones (at least as many as you have threads), because every thread needs to get one in order to stop. – glglgl Feb 08 '12 at 23:48
  • 1
    the `for`-loop in my comment does exactly that (add a sentinel for each thread). btw, `queue.get()` has no `wait` argument. – jfs Feb 09 '12 at 17:54
  • Oh, you are right! I didn't see it at 1st glance... And `s/wait/block/`. – glglgl Feb 09 '12 at 19:37
  • Hi, thanks for the code, but it shows error: Name error q is not defined. Seems q.is_filling is at the wrong place... – user670186 Feb 10 '12 at 15:47
  • works, but following errors: Traceback (most recent call last): File "/usr/local/lib/python3.2/threading.py", line 736, in _bootstrap_inner self.run() File "/usr/local/lib/python3.2/threading.py", line 689, in run self._target(*self._args, **self._kwargs) File "resolve_t2.py", line 132, in getips for site in queue_iterator(q): File "resolve_t2.py", line 126, in queue_iterator except Empty: NameError: global name 'Empty' is not defined – user670186 Feb 10 '12 at 16:00
  • if I remove 'Empty' it works but the MYSQL Insert commands show errors. Seems, I have to write the SQL commmands with resolved ips in a file first and then execute them afterwards to not get into trouble with MYSQL. Is there a way to distribute the ips equally among all threads during running? Means, some threads are much faster than others, could then some host names be moved to other to counter balance and make it faster? – user670186 Feb 10 '12 at 16:13
  • Yes, but it would be too extensive to answer here. Better put a new question on this... `Empty` [is defined in the `Queue` module](http://docs.python.org/library/queue.html#Queue.Empty) - you only have to import it or access it fully qualified. – glglgl Feb 10 '12 at 16:52
  • thanks! As said, to avoid MYSQL exceptions I have to first write all the "UPDATE......." MYSQL commands to a text file and then execute them. Can anyone comment on why MYSQL can not handle many UPDATE commands at once? ALso the maximum number of threads seems to be around 350. Why that number? – user670186 Feb 10 '12 at 17:10
  • I've written [an example that demonstrates how to use sentinels](http://stackoverflow.com/a/9232406/4279) – jfs Feb 10 '12 at 17:53
  • It is not the problem of "many" `UPDATE`s, but of parallel ones. See my edit. – glglgl Feb 10 '12 at 20:24
0

You might find it simpler to use concurrent.futures than threading, multiprocessing, Queue directly:

#!/usr/bin/env python3
import socket
# pip install futures on Python 2.x
from concurrent.futures import ThreadPoolExecutor as Executor

hosts = "youtube.com google.com facebook.com yahoo.com live.com".split()*100
with Executor(max_workers=20) as pool:
     for results in pool.map(socket.gethostbyname_ex, hosts, timeout=60):
         print(results)

Note: you could easily switch from using threads to processes:

from concurrent.futures import ProcessPoolExecutor as Executor

You need it if gethostbyname_ex() is not thread-safe on your OS e.g., it might be the case on OSX.

If you'd like to process exceptions that might arise in gethostbyname_ex():

import concurrent.futures

with Executor(max_workers=20) as pool:
    future2host = dict((pool.submit(socket.gethostbyname_ex, h), h)
                       for h in hosts)
    for f in concurrent.futures.as_completed(future2host, timeout=60):
        e = f.exception()
        print(f.result() if e is None else "{0}: {1}".format(future2host[f], e))

It similar to the example from the docs.

Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670