0

I am trying to use asyncssh and asyncio.gather to execute multiple concurrent ssh commands. For example:

import asyncio, asyncssh, sys

async def run(ssh, cmd):
    return (await ssh.run(cmd, check=True)).stdout

async def main():
    host = sys.argv[1]

    ssh = await asyncssh.connect(host)
    nproc = int(await run(ssh, 'nproc'))
    cmds = [run(ssh, 'cpufreq-info -c{} -p'.format(core)) for core in range(nproc)]
    for ret in await asyncio.gather(*cmds):
        print(ret)

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

If I force the value of nproc to a small number (<10) the program works correctly, but with the real value on my machine (12), I get the following error:

Traceback (most recent call last):
  File "./mwe.py", line 17, in <module>
    asyncio.get_event_loop().run_until_complete(main())
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "./mwe.py", line 13, in main
    for ret in await asyncio.gather(*cmds):
  File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
    future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "./mwe.py", line 5, in run
    return (await ssh.run(cmd, check=True)).stdout
  File "/usr/local/lib/python3.5/dist-packages/asyncssh/connection.py", line 3103, in run
    process = await self.create_process(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/asyncssh/connection.py", line 3009, in create_process
    *args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/asyncssh/connection.py", line 2927, in create_session
    bool(self._agent_forward_path))
  File "/usr/local/lib/python3.5/dist-packages/asyncssh/channel.py", line 1012, in create
    packet = await self._open(b'session')
  File "/usr/local/lib/python3.5/dist-packages/asyncssh/channel.py", line 633, in _open
    return await self._open_waiter
  File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
    future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
asyncssh.misc.ChannelOpenError: open failed

Is there a limitation to the number of concurrent commands? If so, how do I find it and how do I control it with asyncio?

haggai_e
  • 4,689
  • 1
  • 24
  • 37
  • 1
    Limitation if is the case seems to be related to `asyncssh` rather than `asyncio`. I think faster way to find out is to ask in [project's issues](https://github.com/ronf/asyncssh/issues). Anyway you can limit number of concurrent commands with `asyncio.Semaphore` ([example](https://stackoverflow.com/questions/48483348/how-to-limit-concurrency-with-python-asyncio/48486557#48486557) of usage). – Mikhail Gerasimov Jan 06 '20 at 16:49
  • Thanks, I'll try reporting an issue. I've been trying to work around the problem with my own version of gather, but a semaphore looks cleaner. – haggai_e Jan 07 '20 at 18:59

1 Answers1

1

Based on Mikhail's advice and answers on the project issue it seems that the cause is the maximum number of sessions allowed by the ssh server. As I am using an OpenSSH server, my workaround tries to read the maximum number of sessions with a basic parser of /etc/ssh/sshd_config looking for the MaxSessions setting. Using this it creates a semaphore that limits the number of outstanding run calls to prevent the program from reaching the server's limit.

#!/usr/bin/python3
import asyncio, asyncssh, sys

async def run(ssh, sem, cmd):
    async with sem:
        return (await ssh.run(cmd, check=True)).stdout

async def main():
    host = sys.argv[1]

    ssh = await asyncssh.connect(host)
    max_sessions = (await ssh.run(r'sed -n "s/^MaxSessions\s*\([[:digit:]]*\)/\1/p" ' \
                                  '/etc/ssh/sshd_config', check=True)).stdout
    max_sessions = max_sessions or 10
    print('MaxSessions {}'.format(max_sessions))
    sem = asyncio.Semaphore(max_sessions)
    nproc = int(await run(ssh, sem, 'nproc'))
    cmds = [run(ssh, sem, 'cpufreq-info -c{} -p'.format(core)) for core in range(nproc)]
    for ret in await asyncio.gather(*cmds):
        print(ret)

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
haggai_e
  • 4,689
  • 1
  • 24
  • 37