1

I'm sending a XHR request to my Flask server in order to do several pings on a network

Resource

def get(self, site_id):
    …
    for printer in printers:
        hostname = printer['hostname']
        response[site_id][hostname] = network_utils.ping(hostname)

    return response

Ping

Below shell.execute I'm using subprocess.check_output to run a native ping:

def ping(hostname):
    command = ['ping', '-c', '1', '-W', '1', '-q', hostname]

    response = shell.execute(command)
    return output_parser.ping(response['results'])

Output

{
    "test-site": {
        "avg": 0.093, "max": 0.093, "mdev": 0.0, "min": 0.093,
        "1.1.1.1": { "avg": null, "max": null, "mdev": null, "min": null},
        "1.2.3.4": { "avg": null, "max": null, "mdev": null, "min": null},
        "127.0.0.1": { "avg": 0.061, "max": 0.061, "mdev": 0.0, "min": 0.061}
    }
}

Questions

The pings are run sequentially making the request super slow (tens seconds, how can I speed up thing?

Édouard Lopez
  • 40,270
  • 28
  • 126
  • 178

3 Answers3

3

Sounds like the best choice is threading because your issue is I/O bound. I'm using Semaphore to limit to 5 threads.

I'm sending the response dict to the ping dict are thread safe, but you should read this if you think about something more complicated.

def get(self, site_id):
    …
    semaphore = threading.Semaphore(5)
    threads = []

    for printer in printers:
        hostname = printer['hostname']
        threads.append(threading.Thread(target=network_utils.ping,
                          args=(semaphore, response, site_id, hostname)))

    # Start and wait to all threads to finish
    map(lambda t: t.start(), threads)
    map(lambda t: t.join(), threads)

    return response

def ping(semaphore, response, site_id, hostname):
    semaphore.acquire()

    command = ['ping', '-c', '1', '-W', '1', '-q', hostname]
    response = shell.execute(command)
    ping_data = output_parser.ping(response['results'])

    response[site_id][hostname] = ping_data

    semaphore.release()
Community
  • 1
  • 1
Or Duan
  • 13,142
  • 6
  • 60
  • 65
2

Make the subprocess asynchronous via gevent, for example.

from gevent import subprocess
import gevent

def ping(hostname):
    command = ['ping', '-c', '1', '-W', '1', '-q', hostname]
    return subprocess.Popen(command, stdout=subprocess.PIPE)

def get(self, site_id):
    …
    # Start all the pings in parallel, asynchronously
    # Use dict to reference host: ping subprocess
    # as results will come in at different times
    pings = {printer['hostname']: ping(printer['hostname']) 
             for printer in printers}
    # Wait for all of them to complete
    gevent.wait(pings.values())
    for hostname in pings:
        response[site_id][hostname] = output_parser.ping(pings[hostname].stdout.read())
    return response
danny
  • 5,140
  • 1
  • 19
  • 31
  • why did you modify the `ping()` function? – Édouard Lopez Aug 19 '16 at 13:33
  • check_output is a blocking function and will always turn the subprocess calls sequential. With gevent making them asynchronous, need to first start all the subprocesses in parallel and check output once they all complete. – danny Aug 19 '16 at 15:04
  • Note that asynchronous means without threads. Gevent can easily handle hundreds/thousands of parallel subprocesses, especially since yours are I/O bound. With threading, performance suffers once number of threads exceeds number of physical cores available. – danny Aug 19 '16 at 15:12
1

Upvote Or Duan answer as mine is based on his answer:

Resource

class Ping(Resource):
    def get(self, site_id):
        site_hostname = mast_utils.list_sites(site_id)['results'][0]['hostname']
        printers = mast_utils.list_printers(site_id)['results']['channels']

        response = network_utils.parellelize(network_utils.ping, site_hostname, printers)
        return response

api.add_resource(Ping, '/ping/<string:site_id>/')

network_utils.py

def ping(hostname):
    command = ['ping', '-q', hostname,
               '-w', '1',
               '-W', '1',
               '-i', '0.2'
               ]

    response = shell.execute(command)

    return output_parser.ping(response['results'])


def collect(task, response, **kwargs):
    hostname = kwargs['hostname']

    response[hostname] = task(**kwargs)


def parellelize(task, site_id, printers, **kwargs):
    response = {}
    kw = kwargs.copy()
    kw.update({'hostname': site_id})
    collect(task, response, **kw)

    printers_response = {}
    threads = []
    for printer in printers:
        hostname = printer['hostname']
        kw = kwargs.copy()
        kw.update({'hostname': hostname})

        threads.append(
            threading.Thread(
                target=collect,
                args=(task, printers_response),
                kwargs=kw
            )
        )

    for thread in threads:
        thread.start()
        thread.join()

    response[site_id].update(printers_response)

    return response

test_network_utils.py

class NetwrokUtilsTestCase(unittest.TestCase):
    def test_ping_is_null_when_host_unreachable(self):
        hostname = 'unreachable'

        response = network_utils.ping(hostname)

        self.assertDictEqual(response, {
            'avg': None,
            'max': None,
            'mdev': None,
            'min': None
        })

    def test_ping_reply_time_when_reachable(self):
        hostname = '127.0.0.1'

        response = network_utils.ping(hostname)

        self.assertGreater(response['avg'], 0)

    def test_ping_with_only_a_site(self):
        site_hostname = 'localhost'
        printers = []
        response = {}

        response = network_utils.parellelize(network_utils.ping, site_hostname, printers)

        self.assertGreater(response[site_hostname]['avg'], 0)

    def test_ping_with_printers(self):
        site_hostname = 'localhost'
        printers = [
            {'hostname': '127.0.0.1', 'port': 22},
            {'hostname': '0.0.0.0', 'port': 22},
        ]

        response = network_utils.parellelize(network_utils.ping, site_hostname, printers)

        self.assertGreater(response[site_hostname]['avg'], 0)
        self.assertGreater(response[site_hostname]['127.0.0.1']['avg'], 0)
Community
  • 1
  • 1
Édouard Lopez
  • 40,270
  • 28
  • 126
  • 178