17

I am connecting to a local server (OSRM) via HTTP to submit routes and get back drive-times. I notice that I/O is slower than threading because it seems that waiting period for the calculation is smaller than the time it takes to send the request and process the JSON output (I think I/O is better when the server takes some time to process your request -> you don't want it to be blocking because you have to wait, this isn't my case). Threading suffers from the Global Interpreter Lock and so it appears (and evidence below) that my fastest option is to use multiprocessing.

The issue with multiprocessing is that it is so fast that it exhausts my sockets and I get an error (requests issues a new connection each time). I can (in serial) use the requests.Sessions() object to keep a connection alive, however I can't get this working in parallel (each process has it's own session).

The closest code I have to working at the moment is this multiprocessing code:

conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())

def ReqOsrm(url_input):
    ul, qid = url_input      
    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out
        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

However, I can't get the HTTPConnectionPool to work properly and it creates new sockets each time (I think) and then gives me the error:

HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))


My goal is to get distance calculations from an OSRM-routing server I am running locally (as quickly as possible).

I have a question in two parts - basically I am trying to convert some code using multiprocessing.Pool() to better code (proper asynchronous functions - so that the execution never breaks and it runs as fast as possible).

The issue I am having is that everything I try seems slower than multiprocessing (I present several examples below of what I have tried).

Some potential methods are: gevents, grequests, tornado, requests-futures, asyncio, etc.

A - Multiprocessing.Pool()

I initially started with something like this:

def ReqOsrm(url_input):
    req_url, query_id = url_input
    try_c = 0
    #print(req_url)
    while try_c < 5:
        try:
            response = requests.get(req_url)
            json_geocode = response.json()
            status = int(json_geocode['status'])
            # Found route between points
            if status == 200:
            ....

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes)

Where I was connecting to a local server (localhost,port:5005) which was launched on 8 threads and supports parallel execution.

After a bit of search I realised the error I was getting was because requests was opening a new connection/socket for each-request. So this was actually too fast and exhausting sockets after a while. It seems to the way to address this is to use a requests.Session() - however I haven't been able to get this working with multiprocessing (where each process has it's own session).

Question 1.

On some of the computers this runs fine, e.g.:

enter image description here

To compare against later: 45% server usage and 1700 requests per second

However, on some it does not and I don't fully understand why:

HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))

My guess would be that, since requests locks the socket when it is in use - sometimes the server is too slow to respond to the old request and a new one is generated. The server supports queueing, however requests does not so instead of adding to the queue I get the error?

Question 2.

I found:

Blocking Or Non-Blocking?

With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.

If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.

Two excellent examples are grequests and requests-futures.

B - requests-futures

To address this I needed to rewrite my code to use asynchronous requests so I tried the below using:

from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed

(By the way I start my server with the option to use all threads)

And the main code:

calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
    # Submit requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid
    # Process the futures as they become complete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data
        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
        calc_routes.append(row)

Where my function (ReqOsrm) is now rewritten as:

def ReqOsrm(sess, resp):
    json_geocode = resp.json()
    status = int(json_geocode['status'])
    # Found route between points
    if status == 200:
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
    # Cannot find route between points (code errors as 999)
    else:
        out = [999, 0, 0, 0, 0, 0, 0]
    resp.data = out

However, this code is slower than the multiprocessing one! Before I was getting around 1700 requests a second, now I am getting 600 second. I guess this is because I don't have full CPU utilisation, however I'm not sure how to increase it?

enter image description here

C - Thread

I tried another method (creating threads) - however again wasn't sure how to get this to maximise CPU usage (ideally I want to see my server using 50%, no?):

def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()

def getReq(url):
    try:
        resp = requests.get(url)
        return resp.status_code, resp
    except:
        return 999, None

def processReq(status, resp, qid):
    try:
        json_geocode = resp.json()
        # Found route between points
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from = json_geocode['via_points'][0]
            used_to = json_geocode['via_points'][1]
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
        else:
            print("Done but no route")
            out = [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("Error: %s" % err)
        out = [qid, 999, 0, 0, 0, 0, 0, 0]
    qres.put(out)
    return

#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)

for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in url_routes:
        q.put(url)
        q.join()
    except Exception:
        pass

# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

This method is faster than requests_futures I think but I don't know how many threads to set to maximise this -

enter image description here

D - tornado (not working)

I am now trying tornado - however can't quite get it working it breaks with exist code -1073741819 if I use curl - if I use simple_httpclient it works but then I get timeout errors:

ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout

def handle_req(r):
    try:
        json_geocode = json_decode(r)
        status = int(json_geocode['status'])
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
        print(out)
    except Exception as err:
        print(err)
        out = [999, 0, 0, 0, 0, 0, 0]
    return out

# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)

@gen.coroutine
def run_experiment(urls):
    http_client = AsyncHTTPClient()
    responses = yield [http_client.fetch(url) for url, qid in urls]
    responses_out = [handle_req(r.body) for r in responses]
    raise gen.Return(value=responses_out)

# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)

E - asyncio / aiohttp

Decided to try another approach (although would be great to get tornado working) using asyncio and aiohttp.

import asyncio
import aiohttp

def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    status = int(json_geocode['status'])
    if status == 200:
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
    else:
        print("Done, but not route for {0} - status: {1}".format(qid, status))
        out = [qid, 999, 0, 0, 0, 0, 0, 0]
    return out

def chunked_http_client(num_chunks):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will download files asynchronously and respect
    # locking fo semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            response = yield from aiohttp.request('GET', url)
            body = yield from response.content.read()
            yield from response.wait_for_close()
        return body, qid
    return http_get

def run_experiment(urls):
    http_client = chunked_http_client(500)
    # http_client returns futures
    # save all the futures to a list
    tasks = [http_client(url, qid) for url, qid in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for {0} - {1}".format(qid,err))
            out = [qid, 999, 0, 0, 0, 0, 0, 0]
        response.append(out)
    return response

# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))

This works OK, however still slower than multiprocessing!

enter image description here

Community
  • 1
  • 1
mptevsion
  • 937
  • 8
  • 28
  • 1
    another approach other than trying to fool around with optimal thread pool size is to use an event loop. You could register requests with a callback and wait for the event loop to handle whenever a response is returned – dm03514 Mar 02 '16 at 20:03
  • @dm03514 Thanks for that! However, is this not what I have when I do my requests-futures example? `future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))` – mptevsion Mar 02 '16 at 20:04
  • 1
    I've never used RequestFuture, but i think it still delagates to a thread pool, the event loop should be a new request model all together, and will only expose a single thread, so you don't have to worry about how many threads to configure to do work :) python has one in stdlibrary https://pypi.python.org/pypi/aiohttp, which i've never used but looks relatively straightforward, tornado is a framework built on os event libraries which has simple api. http://tornadokevinlee.readthedocs.org/en/latest/httpclient.html – dm03514 Mar 02 '16 at 20:15
  • @dm03514 I tried to use aiohttp and it did a fairly good job (better than requests-futures). However, the speed is still slower than than using multiprocessing - unless I am implementing it incorrectly and bottlenecking – mptevsion Mar 03 '16 at 14:20
  • @mptevsion I' trying to do the same thing. Could you tell me what you mean when you say "...connecting to a local server (localhost,port:5005) which was launched on 8 threads and supports parallel execution."? How do you lunch OSRM server on 8 threads? How do you support parallel execution? Last question: could you give me an example of url_routes? What is qid? – Patrick May 01 '18 at 19:05

4 Answers4

9

Thanks everyone for the help. I wanted to post my conclusions:

Since my HTTP requests are to a localserver which processes the request instantly it does not make much sense for me to use async approaches (compared to most cases when requests are sent over the internet). The costly factor for me is actually sending the request and processing the feedback, which means I get much better speeds using multiple processes (threads suffer from GIL). I should also use sessions to increase the speed (no need to close and re-open a connection to the SAME server) and help prevent port-exhaustion.

Here are all the methods tried (working) with example RPS:

Serial

S1. Serial GET request (no session) -> 215 RPS

def ReqOsrm(data):
    url, qid = data
    try:
        response = requests.get(url)
        json_geocode = json.loads(response.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

S2. Serial GET request (requests.Session()) -> 335 RPS

session = requests.Session()
def ReqOsrm(data):
    url, qid = data
    try:
        response = session.get(url)
        json_geocode = json.loads(response.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

S3. Serial GET request (urllib3.HTTPConnectionPool) -> 545 RPS

conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1)
def ReqOsrm(data):
    url, qid = data
    try:
        response = conn_pool.request('GET', url)
        json_geocode = json.loads(response.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

Async IO

A4. AsyncIO with aiohttp -> 450 RPS

import asyncio
import aiohttp
concurrent = 100
def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    tot_time_s = json_geocode['paths'][0]['time']
    tot_dist_m = json_geocode['paths'][0]['distance']
    return [qid, 200, tot_time_s, tot_dist_m]
def chunked_http_client(num_chunks):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will download files asynchronously and respect
    # locking fo semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            with aiohttp.ClientSession() as session:
                response = yield from session.get(url)
                body = yield from response.content.read()
                yield from response.wait_for_close()
        return body, qid
    return http_get
def run_experiment(urls):
    http_client = chunked_http_client(num_chunks=concurrent)
    # http_client returns futures, save all the futures to a list
    tasks = [http_client(url, qid) for url, qid in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for {0} - {1}".format(qid,err))
            out = [qid, 999, 0, 0]
        response.append(out)
    return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))

A5. Threading without sessions -> 330 RPS

from threading import Thread
from queue import Queue
concurrent = 100
def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()
def getReq(url):
    try:
        resp = requests.get(url)
        return resp.status_code, resp
    except:
        return 999, None
def processReq(status, resp, qid):
    try:
        json_geocode = json.loads(resp.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        out = [qid, 999, 0, 0]
    qres.put(out)
    return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
for url in url_routes:
    q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

A6. Threading with HTTPConnectionPool -> 1550 RPS

from threading import Thread
from queue import Queue
from urllib3 import HTTPConnectionPool
concurrent = 100
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()
def getReq(url):
    try:
        resp = conn_pool.request('GET', url)
        return resp.status, resp
    except:
        return 999, None
def processReq(status, resp, qid):
    try:
        json_geocode = json.loads(resp.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        out = [qid, 999, 0, 0]
    qres.put(out)
    return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
for url in url_routes:
    q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

A7. requests-futures -> 520 RPS

from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
concurrent = 100
def ReqOsrm(sess, resp):
    try:
        json_geocode = resp.json()
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err)
        out = [999, 0, 0]
    resp.data = out
#Run:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session:
    # Submit requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid
    # Process the futures as they become complete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data
        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0]
        calc_routes.append(row)

Multiple Processes

P8. multiprocessing.worker + queue + requests.session() -> 1058 RPS

from multiprocessing import *
class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.qin = qin
        self.qout = qout
    def run(self):
        s = requests.session()
        while not self.qin.empty():
            url, qid = self.qin.get()
            data = s.get(url)
            self.qout.put(ReqOsrm(data, qid))
            self.qin.task_done()
def ReqOsrm(resp, qid):
    try:
        json_geocode = json.loads(resp.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid)
        return [qid, 999, 0, 0]
# Run:
qout = Queue()
qin = JoinableQueue()
[qin.put(url_q) for url_q in url_routes]
[Worker(qin, qout).start() for _ in range(cpu_count())]
qin.join()
calc_routes = []
while not qout.empty():
    calc_routes.append(qout.get())

P9. multiprocessing.worker + queue + HTTPConnectionPool() -> 1230 RPS

P10. multiprocessing v2 (not really sure how this is different) -> 1350 RPS

conn_pool = None
def makePool(host, port):
    global conn_pool
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(data):
    url, qid = data
    try:
        response = conn_pool.request('GET', url)
        json_geocode = json.loads(response.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        return [qid, 999, 0, 0]
# Run:
pool = Pool(initializer=makePool, initargs=(ghost, gport))
calc_routes = pool.map(ReqOsrm, url_routes)

So in conclusion it seems that the best methods for me are #10 (and surprisingly #6)

mptevsion
  • 937
  • 8
  • 28
  • 1
    Another approach you could try is to use multiprocessing with asyncio (or gevent). I've only used gevent, but it can only take advantage of a single core due to being single threaded coroutines. Coroutine switches should be faster than threads so multiprocessing + coroutines might be fastest. – Eric Conner Mar 07 '16 at 17:57
  • Are you going to select an answer? – RootTwo Mar 11 '16 at 07:23
  • I'm getting the error: ChunkedEncodingError(ProtocolError('Connection broken: IncompleteRead(162 bytes read)', IncompleteRead(162 bytes read)) when running P8 – ballade4op52 Mar 13 '16 at 10:00
2

Question 1

You get the error, because this approach:

def ReqOsrm(url_input):
    req_url, query_id = url_input
    try_c = 0
    #print(req_url)
    while try_c < 5:
        try:
            response = requests.get(req_url)
            json_geocode = response.json()
            status = int(json_geocode['status'])
            # Found route between points
            if status == 200:
            ....

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes)

creates a new TCP connection for each requested URL and on some point it fails just because system is out of free local ports. To confirm that you can run netstatwhile your code is executing:

netstat -a -n | find /c "localhost:5005"

This will give you a number of connections to the server.

Also, reaching of 1700 RPS looks quite unrealistic for this approach, since requests.get is quite expensive operation and it is unlikely that you can get even 50 RPS this way. So, you probably need to double check your RPS calculations.

To avoid the error you need to use sessions instead of creating connections from the scratch:

import multiprocessing
import requests
import time


class Worker(multiprocessing.Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.qin = qin
        self.qout = qout

    def run(self):
        s = requests.session()
        while not self.qin.empty():
            result = s.get(self.qin.get())
            self.qout.put(result)
            self.qin.task_done()

if __name__ == '__main__':
    start = time.time()

    qin = multiprocessing.JoinableQueue()
    [qin.put('http://localhost:8080/') for _ in range(10000)]

    qout = multiprocessing.Queue()

    [Worker(qin, qout).start() for _ in range(multiprocessing.cpu_count())]

    qin.join()

    result = []
    while not qout.empty():
        result.append(qout.get())

    print time.time() - start
    print result

Question 2

You won't get higher RPS with threads or async approaches unless I/O takes more time than calculations (e.g. high network latency, large responses, etc.), because threads are affected by GIL since running in the same Python process and asynchronous libs can be blocked by long running calculations.

Although threads or async libs can improve performance, running the same threaded or asynchronous code in multiple processes will give you even more performance anyway.

Vader
  • 3,675
  • 23
  • 40
2

Looking at your multiprocessing code at the top of the question. It seems that a HttpConnectionPool() is being called each time ReqOsrm is called. Thus a new pool is created for each url. Instead, use the initializer and args parameter to create a single pool for each process.

conn_pool = None

def makePool(host, port):
    global conn_pool
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)

def ReqOsrm(url_input):
    ul, qid = url_input

    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out

        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]

    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

if __name__ == "__main__":
    # run:
    pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005))
    calc_routes = pool.map(ReqOsrm, url_routes)
    pool.close()
    pool.join()

The request-futures version appears to have a indentation error. The loop for future in as_completed(futures): is indented under the outer loop for i in range(len(url_routes)):. So a request is made in the outer loop and then the inner loop waits for that future to return before the next iteration of the outer loop. This makes the requests run serially rather than in parallel.

I think the code should be as follows:

calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
    # Submit all the requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid

    # this was indented under the code in section B of the question
    # process the futures as they become copmlete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data

        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
        print(row)
        calc_routes.append(row)
RootTwo
  • 4,288
  • 1
  • 11
  • 15
2

Here is a pattern I've used with gevent, which is coroutine based and may not suffer from GIL. This may be faster than using Threads and maybe fastest when used in combination with multiprocessing (currently it'd only use 1 core):

from gevent import monkey
monkey.patch_all()

import logging
import random
import time
from threading import Thread

from gevent.queue import JoinableQueue
from logger import initialize_logger

initialize_logger()
log = logging.getLogger(__name__)


class Worker(Thread):

    def __init__(self, worker_idx, queue):
        # initialize the base class
        super(Worker, self).__init__()
        self.worker_idx = worker_idx
        self.queue = queue

    def log(self, msg):
        log.info("WORKER %s - %s" % (self.worker_idx, msg))

    def do_work(self, line):
        #self.log(line)
        time.sleep(random.random() / 10)

    def run(self):
        while True:
            line = self.queue.get()
            self.do_work(line)
            self.queue.task_done()


def main(number_of_workers=20):
    start_time = time.time()

    queue = JoinableQueue()
    for idx in range(number_of_workers):
        worker = Worker(idx, queue)
        # "daemonize" a thread to ensure that the threads will
        # close when the main program finishes
        worker.daemon = True
        worker.start()

    for idx in xrange(100):
        queue.put("%s" % idx)

    queue.join()
    time_taken = time.time() - start_time
    log.info("Parallel work took %s seconds." % time_taken)

    start_time = time.time()
    for idx in xrange(100):
        #log.info(idx)
        time.sleep(random.random() / 10)
    time_taken = time.time() - start_time
    log.info("Sync work took %s seconds." % time_taken)


if __name__ == "__main__":
    main()
Eric Conner
  • 10,422
  • 6
  • 51
  • 67