0

I have been attempting to generate a ping scan that uses a limited number of processes. I tried as_completed without success and switched to asyncio.wait with asyncio.FIRST_COMPLETED.

The following complete script works if the offending line is commented out. I'd like to collect the tasks to a set in order to get rid of pending = list(pending) however pending_set.union(task) throws await wasn't used with future.

"""Test simultaneous pings, limiting processes."""
import asyncio
from time import asctime

pinglist = [
    '127.0.0.1', '192.168.1.10', '192.168.1.20', '192.168.1.254',
    '192.168.177.20', '192.168.177.100', '172.17.1.1'
]


async def ping(ip):
    """Run external ping."""
    p = await asyncio.create_subprocess_exec(
        'ping', '-n', '-c', '1', ip,
        stdout=asyncio.subprocess.DEVNULL,
        stderr=asyncio.subprocess.DEVNULL
    )
    return await p.wait()


async def run():
    """Run the test, uses some processes and will take a while."""
    iplist = pinglist[:]
    pending = []
    pending_set = set()
    tasks = {}
    while len(pending) or len(iplist):
        while len(pending) < 3 and len(iplist):
            ip = iplist.pop()
            print(f"{asctime()} adding {ip}")
            task = asyncio.create_task(ping(ip))
            tasks[task] = ip
            pending.append(task)
            pending_set.union(task)  # comment this line and no error
        done, pending = await asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED
        )
        pending = list(pending)
        for taskdone in done:
            print(' '.join([
                asctime(),
                ('BAD' if taskdone.result() else 'good'),
                tasks[taskdone]
            ]))

if __name__ == '__main__':
    asyncio.run(run())
martineau
  • 119,623
  • 25
  • 170
  • 301
jwal
  • 630
  • 6
  • 16

3 Answers3

2

There are two problems with pending_set.union(task):

  • union doesn't update the set in-place, it returns a new set consisting of the original one and the one it receives as argument.

  • It accepts an iterable collection (such as another set), not a single element. Thus union attempts to iterate over task, which doesn't make sense. To make things more confusing, task objects are technically iterable in order to be usable in yield from expressions, but they detect iteration attempts in non-async contexts, and report the error you've observed.

To fix both issues, you should use the add method instead, which operates by side effect and accepts a single element to add to the set:

pending_set.add(task)

Note that a more idiomatic way to limit concurrency in asyncio is using a Semaphore. For example (untested):

async def run():
    limit = asyncio.Semaphore(3)
    async def wait_and_ping(ip):
        async with limit:
            print(f"{asctime()} adding {ip}")
            result = await ping(ip)
        print(asctime(), ip, ('BAD' if result else 'good'))
    await asyncio.gather(*[wait_and_ping(ip) for ip in pinglist])
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • .add fixed my stated problem perfectly. Your rephrasing of the structure stripped about 20 lines out of my solution for the same outcome (I am still working on understanding this). Thank you. – jwal Mar 10 '21 at 05:18
  • @jwal Feel free to ask if you need clarifications of the Semaphore based solution. Note that it's a pretty popular idiom, you can e.g. google aiohttp download semaphore and you'll find a bunch of references to the same technique. Or look at answers to [this question](https://stackoverflow.com/questions/48483348/how-to-limit-concurrency-with-python-asyncio), etc. – user4815162342 Mar 10 '21 at 18:52
0

Use await asyncio.gather(*pending_set)

  • asyncio.gather() accepts any number of awaitables and also returns one
  • * unpacks the set
    >>> "{} {} {}".format(*set((1,2,3)))
    '1 2 3'
    

Example from the docs

await asyncio.gather(
    factorial("A", 2),
    factorial("B", 3),
    factorial("C", 4),
)
ti7
  • 16,375
  • 6
  • 40
  • 68
  • This assumes you have the tasks all ready at one time and are happy to spawn a subprocess for every one at the same time, this is a resource peak my approach avoids. – jwal Mar 09 '21 at 18:41
0

I solved this without queuing the ping targets in my original application, which simplified things. This answer includes a gradually received list of targets and the useful pointers from @user4815162342. This completes the answer to the original question.

import asyncio
import time

pinglist = ['127.0.0.1', '192.168.1.10', '192.168.1.20', '192.168.1.254',
    '192.168.177.20', '192.168.177.100', '172.17.1.1']

async def worker(queue):
    limit = asyncio.Semaphore(4)    # restrict the rate of work

    async def ping(ip):
        """Run external ping."""
        async with limit:
            print(f"{time.time():.2f} starting {ip}")
            p = await asyncio.create_subprocess_exec(
                'ping', '-n', '1', ip,
                stdout=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.DEVNULL
            )
            return (ip, await p.wait())

    async def get_assign():
        return await queue.get()

    assign = {asyncio.create_task(get_assign())}
    pending = set()

Maintaining two distinct pending sets proved key. One set is a single task that receives assigned addresses. This completes and needs restarted each time. The other set is for the ping messages which run once and are then complete.

    while len(assign) + len(pending) > 0:  # stop condition
        done, pending = await asyncio.wait(
            set().union(assign, pending),
            return_when=asyncio.FIRST_COMPLETED
        )
        for job in done:
            if job in assign:
                if job.result() is None:
                    assign = set()  # for stop condition
                else:
                    pending.add(asyncio.create_task(ping(job.result())))
                    assign = {asyncio.create_task(get_assign())}
            else:
                print(
                    f"{time.time():.2f} result {job.result()[0]}"
                    f" {['good', 'BAD'][job.result()[1]]}"
                )

The remainder is pretty straight forward.

async def assign(queue):
    """Assign tasks as if these are arriving gradually."""
    print(f"{time.time():.2f} start assigning")
    for task in pinglist:
        await queue.put(task)
        await asyncio.sleep(0.1)
    await queue.put(None)           # to stop nicely

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(worker(queue), assign(queue))

if __name__ == '__main__':
    asyncio.run(main())

The output of this is (on my network with 172 failing to respond):

1631611141.70 start assigning
1631611141.70 starting 127.0.0.1
1631611141.71 result 127.0.0.1 good
1631611141.80 starting 192.168.1.10
1631611141.81 result 192.168.1.10 good
1631611141.91 starting 192.168.1.20
1631611142.02 starting 192.168.1.254
1631611142.03 result 192.168.1.254 good
1631611142.13 starting 192.168.177.20
1631611142.23 starting 192.168.177.100
1631611142.24 result 192.168.177.100 good
1631611142.34 starting 172.17.1.1
1631611144.47 result 192.168.1.20 good
1631611145.11 result 192.168.177.20 good
1631611145.97 result 172.17.1.1 BAD
jwal
  • 630
  • 6
  • 16