0

I have this piece of python code:

def __init__(self):
  self.ip_list=[]
  self.queue=Queue()

  for i in range(5):
    worker=threading.Thread(target=self.__executeCmd, name="executeCmd("+str(i)+")")
    worker.setDaemon(True)
    worker.start()
  self.queue.put(["wget", "-qO-", "http://ipecho.net/plain"])
  self.queue.put(["curl", "http://www.networksecuritytoolkit.org/nst/cgi-bin/ip.cgi"])
  self.queue.put(["curl", "v4.ident.me"])
  self.queue.put(["curl", "ipv4.icanhazip.com"])
  self.queue.put(["curl", "ipv4.ipogre.com"])

def __executeCmd(self):
  cmd=self.queue.get()
  try:
    rc=subprocess.check_output(cmd, stderr=open(os.devnull, 'w')).strip()
  except:
    self.queue.task_done()
    return
  if self.is_valid_ip(rc)==True:
    self.ip_list.append(rc)
  self.queue.task_done()

def waitForIP(self, wait_in_sec):
  cnt=wait_in_sec*10
  while self.ip_list==[]:
    time.sleep(0.1)
    cnt-=1
    if cnt<=0:
      return("")
  return(self.ip_list[0])

Its for querying the external IP address from five URLs and get the response from that one that was delivered first.

But sometimes I get this (and I get it via email because the job was started via crontab):

Exception in thread executeCmd(0) (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
  File "/usr/lib/python2.7/threading.py", line 505, in run
  File "/home/dede/bin/tunnel_watchdog.py", line 115, in __executeCmd
  File "/usr/lib/python2.7/Queue.py", line 65, in task_done
  File "/usr/lib/python2.7/threading.py", line 296, in notifyAll
<type 'exceptions.TypeError'>: 'NoneType' object is not callable

I think its because the script has ended but a thread was still running and then came out of subprocess.check_output().

Is there a way to avoid this (without waiting for all five URLs delivered their data) ?

Bach
  • 6,145
  • 7
  • 36
  • 61
dede
  • 706
  • 9
  • 19
  • *Something* will have to wait. – Lasse V. Karlsen Apr 30 '14 at 09:53
  • Why do you use a Queue? If it's only to pass the values to each sub-thread, that's overkill. For example, a plain list would do (as long as you fill it before starting the threads, and use pop() or pop(0) to get the next item). It might work around the problem shown here, at least... – Armin Rigo Apr 30 '14 at 12:04

1 Answers1

1

The project is much simpler than it looks. Here's one implementation using the multiprocessing module.

The function imap_unordered runs the jobs in parallel, and returns the first-completed one first. The outer level function checks the result. If the result is okay, it's printed, then the pool is terminated and the entire program exits. It doesn't wait for the other jobs to complete.

import multiprocessing, re, subprocess, sys

CMD_LIST = [
    ["wget", "-qO-", "http://ipecho.net/plain"],
    ["curl", '-s', "http://www.networksecuritytoolkit.org/nst/cgi-bin/ip.cgi"],
    ["curl", '-s', "v4.ident.me"],
    ["curl", '-s', "ipv4.icanhazip.com"],
    ["curl", '-s', "ipv4.ipogre.com"],
]


ip_pat = re.compile('[0-9.]{7,}')
pool = multiprocessing.Pool(5)
for output in pool.imap_unordered(subprocess.check_output, CMD_LIST):
    print 'output:',output
    m = ip_pat.search(output)
    if m:
        print 'GOT IP:', m.group(0)
        pool.terminate()
        sys.exit(0)

print 'no IP found'
johntellsall
  • 14,394
  • 4
  • 46
  • 40
  • 1
    Works great, if I use a "wrapper" for the check_output like: def spco(cmd): return(subprocess.check_output(cmd, stderr=open(os.devnull, 'w'))) – dede Jun 04 '14 at 06:05
  • @dede: to collect output from multiple processes concurrently, you could use a thread pool here: `multiprocessing.pool.ThreadPool` -- the interface is identical. Here's [`asyncio`-based solution](http://stackoverflow.com/a/23616229/4279) – jfs Jan 14 '15 at 12:23