1

Is there anything I can do to the below code (I thought sessions would solve this?) to prevent new TCP connections being created with each GET request? I am hitting around 1000 requests a second and after around 10,000 request run out of sockets:

def ReqOsrm(url_input):
    ul, qid = url_input
    conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=1)
    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out
        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

# run:
pool = Pool(int(cpu_count()))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))


Eric - thank you a lot for the response I think it's exactly what I need. However, I can't quite modify it correctly. The code correctly returns 10,000 responses for the first few cycles however then it seems to break and returns less than 10,000 which leads me to think I implemented the Queue incorrectly?

enter image description here

ghost = 'localhost'
gport = 8989

def CreateUrls(routes, ghost, gport):
    return [
        ["http://{0}:{1}/route?point={2}%2C{3}&point={4}%2C{5}&vehicle=car&calc_points=false&instructions=false".format(
            ghost, gport, alat, alon, blat, blon),
            qid] for qid, alat, alon, blat, blon in routes]


def LoadRouteCSV(csv_loc):
    if not os.path.isfile(csv_loc):
        raise Exception("Could not find CSV with addresses at: %s" % csv_loc)
    else:
        return pd.read_csv(csv_loc, sep=',', header=None, iterator=True, chunksize=1000 * 10)

class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self._qin = qin
        self._qout = qout

    def run(self):
        # Create threadsafe connection pool
        conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=10)

        class Consumer(threading.Thread):
            def __init__(self, qin, qout):
                threading.Thread.__init__(self)
                self.__qin = qin
                self.__qout = qout

            def run(self):
                while True:
                    msg = self.__qin.get()
                    ul, qid = msg
                    try:
                        response = conn_pool.request('GET', ul)
                        s = float(response.status)
                        if s == 200:
                            json_geocode = json.loads(response.data.decode('utf-8'))
                            tot_time_s = json_geocode['paths'][0]['time']
                            tot_dist_m = json_geocode['paths'][0]['distance']
                            out = [qid, s, tot_time_s, tot_dist_m]
                        elif s == 400:
                            print("Done but no route for row: ", qid)
                            out = [qid, 999, 0, 0]
                        else:
                            print("Done but unknown error for: ", s)
                            out = [qid, 999, 0, 0]
                    except Exception as err:
                        print(err)
                        out = [qid, 999, 0, 0]
                    self.__qout.put(out)
                    self.__qin.task_done()

        num_threads = 10
        [Consumer(self._qin, self._qout).start() for _ in range(num_threads)]

if __name__ == '__main__':
    try:
        with open(os.path.join(directory_loc, 'gh_output.csv'), 'w') as outfile:
            wr = csv.writer(outfile, delimiter=',', lineterminator='\n')
            for x in LoadRouteCSV(csv_loc=os.path.join(directory_loc, 'gh_input.csv')):
                routes = x.values.tolist()
                url_routes = CreateUrls(routes, ghost, gport)
                del routes

                stime = time.time()

                qout = Queue()
                qin = JoinableQueue()
                [qin.put(url_q) for url_q in url_routes]
                [Worker(qin, qout).start() for _ in range(cpu_count())]
                # Block until all urls in qin are processed
                qin.join()
                calc_routes = []
                while not qout.empty():
                    calc_routes.append(qout.get())

                # Time diagnostics
                dur = time.time() - stime
                print("Calculated %d distances in %.2f seconds: %.0f per second" % (len(calc_routes),
                                                                                    dur,
                                                                                    len(calc_routes) / dur))
                del url_routes
                wr.writerows(calc_routes)
                done_count += len(calc_routes)
                # Continually update progress in terms of millions
                print("Saved %d calculations" % done_count)
mptevsion
  • 937
  • 8
  • 28
  • 2
    Hi, your problem is described here: [You are overloading the TCP/IP stack...](http://stackoverflow.com/a/1339240) ;) – kamy22 Mar 04 '16 at 16:07
  • 1
    The issue to me seems to be that you're creating a new connection pool for every URL you request. Why not have a connection pool per process and reuse connections? – Eric Conner Mar 04 '16 at 16:09
  • @EricConner is this as simple as creating a global `conn_pool = HTTPConnectionPool(host='localhost', port=5000, maxsize=int(cpu_count()))` and the just referencing it in the function `response = conn_pool.request('GET', req_url)` which is called by `calc_routes = pool.map(ReqOsrm, url_routes)`? Reason I ask is because for one server (Graphhopper) this works super well (and the speed improved by 3x) however if I change the server port to the OSRM server it becomes super slow compared to before (100x slower). Could it be that some severs do not allow keep-alive/sessions? – mptevsion Mar 04 '16 at 17:11

2 Answers2

1

I was thinking something more like this. The idea is to spawn a process per core and a pool of threads per process. Each process has a separate connection pool which are shared among the threads in that process. I don't think you can get a more performant solution without some kind of threading.

from multiprocessing import Pool, cpu_count
import Queue

from urllib3 import HTTPConnectionPool
import threading


def ReqOsrm(url_input):
    # Create threadsafe connection pool
    conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=1000)

    # Create consumer thread class
    class Consumer(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self._queue = queue

        def run(self):
          while True:
              msg = self._queue.get()
              try:
                response = conn_pool.request('GET', url)
                print response
              except Exception as err:
                print err
              self._queue.task_done()

    # Create work queue and a pool of workers
    queue = Queue.Queue()
    num_threads = 20
    workers = []
    for _ in xrange(num_threads):
        worker = Consumer(queue)
        worker.start()
        workers.append(worker)

    for url in url_input:
        queue.put(url)

    queue.join()

url_routes = [
    ["/proc1-0", "/proc1-1"],
    ["/proc2-0", "/proc2-1"],
    ["/proc3-0", "/proc3-1"],
    ["/proc4-0", "/proc4-1"],
    ["/proc5-0", "/proc5-1"],
    ["/proc6-0", "/proc6-1"],
    ["/proc7-0", "/proc7-1"],
    ["/proc8-0", "/proc8-1"],
    ["/proc9-0", "/proc9-1"],
]

pool = Pool(int(cpu_count()))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
Eric Conner
  • 10,422
  • 6
  • 51
  • 67
  • thanks! I think this is nearly exactly what I need. I have a small issue with queue (to capture result) which I have edited in to the bottom of my post. Would be grateful if you could help help out with that – mptevsion Mar 06 '16 at 03:24
  • It's difficult to tell from that snippet what's going wrong. You said it calculates double? Does that mean you end up hitting the same URLs multiple times? How are you splitting up the URLs among processes? Maybe post a fuller version of the script? – Eric Conner Mar 06 '16 at 18:41
  • apologies but I described the problem incorrectly. I seem to be skipping results. I have posted pretty much the full script now in my edited post. Thank you once again - a bit frustrating as I hope I'm close to getting this sorted :) – mptevsion Mar 06 '16 at 19:41
  • Ah, silly me! I needed to add this `self._qin.join()` to my process class (as well as having `qin.join()` in my main code. I'm pretty sure ? At least that seems to have solved this – mptevsion Mar 06 '16 at 20:05
0

Appreciate the help - my working solution:

class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self._qin = qin
        self._qout = qout

    def run(self):
        # Create threads to run in process
        class Consumer(threading.Thread):
            def __init__(self, qin, qout):
                threading.Thread.__init__(self)
                self.__qin = qin
                self.__qout = qout

            def run(self):
                # Close once queue empty (otherwise process will linger)
                while not self.__qin.empty():
                    msg = self.__qin.get()
                    ul, qid = msg
                    try:
                        response = conn_pool.request('GET', ul)
                        s = float(response.status)
                        if s == 200:
                            json_geocode = json.loads(response.data.decode('utf-8'))
                            tot_time_s = json_geocode['paths'][0]['time']
                            tot_dist_m = json_geocode['paths'][0]['distance']
                            out = [qid, s, tot_time_s, tot_dist_m]
                        elif s == 400:
                            #print("Done but no route for row: ", qid)
                            out = [qid, 999, 0, 0]
                        else:
                            print("Done but unknown error for: ", s)
                            out = [qid, 999, 0, 0]
                    except Exception as err:
                        print(err)
                        out = [qid, 999, 0, 0]
                    #print(out)
                    self.__qout.put(out)
                    self.__qin.task_done()

        # Create thread-safe connection pool
        concurrent = 10
        conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
        num_threads = concurrent
        # Start threads (concurrent) per process
        [Consumer(self._qin, self._qout).start() for _ in range(num_threads)]
        # Block until all urls in self._qin are processed
        self._qin.join()
        return

if __name__ == '__main__':
   # Fill queue input
   qin = JoinableQueue()
   [qin.put(url_q) for url_q in url_routes]
   # Queue to collect output
   qout = Queue()
   # Start cpu_count number of processes (which will launch threads and sessions)
   workers = []
   for _ in range(cpu_count()):
       workers.append(Worker(qin, qout))
       workers[-1].start()
   # Block until all urls in qin are processed
   qin.join()
   # Fill routes
   calc_routes = []
   while not qout.empty():
       calc_routes.append(qout.get())
   del qin, qout
mptevsion
  • 937
  • 8
  • 28