0

We are trying to use asyncio to run a straightforward client/server. The server is an echo server with two possible commands sent by the client, "quit" and "timer". The timer command starts a timer that will print a message in the console every second (at the server and client), and the quit command closes the connection.

The actual problem is the following:

When we run the server and the client, and we start the timer, the result of the timer is not sent to the client. It blocks the server and the client. I believe that the problem is on the client's side. However, I was not able to detect it.

Server

import asyncio
import time

HOST = "127.0.0.1"
PORT = 9999

class Timer(object):
    '''Simple timer class that can be started and stopped.'''
    def __init__(self, writer: asyncio.StreamWriter, name = None, interval = 1) -> None:
        self.name = name
        self.interval = interval
        self.writer = writer

    async def _tick(self) -> None:
        while True:
            await asyncio.sleep(self.interval)
            delta = time.time() - self._init_time
            self.writer.write(f"Timer {delta} ticked\n".encode())
            self.writer.drain()
            print("Delta time: ", delta)

    async def start(self) -> None:
        self._init_time = time.time()
        self.task = asyncio.create_task(self._tick())

    async def stop(self) -> None:
        self.task.cancel()
        print("Delta time: ", time.time() - self._init_time)

async def msg_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
    '''Handle the echo protocol.'''
    # timer task that the client can start:
    timer_task = False

    try:
        while True:

            data = await reader.read(1024) # Read 256 bytes from the reader. Size of the message
            msg = data.decode() # Decode the message

            addr, port = writer.get_extra_info("peername") # Get the address of the client
            print(f"Received {msg!r} from {addr}:{port!r}")

            send_message = "Message received: " + msg
            writer.write(send_message.encode()) # Echo the data back to the client
            await writer.drain()  # This will wait until everything is clear to move to the next thing.

            if data == b"quit" and timer_task is True:
                # cancel the timer_task (if any)
                if timer_task:
                    timer_task.cancel()
                    await timer_task
                writer.close()  # Close the connection
                await writer.wait_closed()  # Wait for the connection to close


            elif data == b"quit" and timer_task is False:
                writer.close() # Close the connection
                await writer.wait_closed() # Wait for the connection to close

            elif data == b"start" and timer_task is False:
                print("Starting timer")
                t = Timer(writer)
                timer_task = True
                await t.start()

            elif data == b"stop" and timer_task is True:
                print("Stopping timer")
                await t.stop()
                timer_task = False

    except ConnectionResetError:
        print("Client disconnected")


async def run_server() -> None:
    # Our awaitable callable.
    # This callable is ran when the server recieves some data
    server = await asyncio.start_server(msg_handler, HOST, PORT)

    async with server:
        await server.serve_forever()


if __name__ == "__main__":
    loop = asyncio.new_event_loop() # new_event_loop() is for python 3.10. For older versions, use get_event_loop()
    loop.run_until_complete(run_server())

Client

import asyncio

HOST = '127.0.0.1'
PORT = 9999


async def run_client() -> None:
    # It's a coroutine. It will wait until the connection is established
    reader, writer = await asyncio.open_connection(HOST, PORT)

    while True:

        message = input('Enter a message: ')
        writer.write(message.encode())
        await writer.drain()

        data = await reader.read(1024)
        if not data:
            raise Exception('Socket not communicating with the client')
        print(f"Received {data.decode()!r}")

        if (message == 'quit'):
            writer.write(b"quit")
            writer.close()
            await writer.wait_closed()
            exit(2)
            # break # Don't know if this is necessary


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.run_until_complete(run_client())
nunodsousa
  • 2,635
  • 4
  • 27
  • 49
  • 1
    Possible solution: [https://stackoverflow.com/a/29102047/2823755](https://stackoverflow.com/a/29102047/2823755). A few answers suggest using aioconsole package - [https://stackoverflow.com/a/48865532/2823755](https://stackoverflow.com/a/48865532/2823755). Search with `python asyncio input blocks site:stackoverflow.com` – wwii Nov 17 '22 at 01:11

3 Answers3

2

The client blocks on the input() function. This question is similar to server stop receiving msg after 1 msg receive

dirck
  • 838
  • 5
  • 10
  • Is this question a duplicate of the link in your answer? – wwii Nov 17 '22 at 00:57
  • 1
    It is very similar; I believe the cause is the same but the code and symptoms are a bit different, which makes it not entirely obvious. In the first question, the client isn't sending; in this question, the client isn't receiving. Underlying cause is the same: you can't use the `input()` function on the same thread as a running asyncio loop. If you think that's a duplicate you can mark it so. – dirck Nov 17 '22 at 01:01
1

Finally, I found a possible solution, by separating the thread.

import asyncio
import websockets
import warnings
warnings.filterwarnings("ignore")

async def send_msg(websocket):
    while True:
        imp = await asyncio.get_event_loop().run_in_executor(None, lambda: input("Enter something: "))
        print("MESSAGE: ", imp)
        await websocket.send(imp)
        #return imp

async def recv_msg(websocket):
    while True:
        msg = await websocket.recv()
        print(f":> {msg}")


async def echo_loop():
    uri = f"ws://localhost:8765"
    async with websockets.connect(uri, ssl=None) as websocket:
        while True:
           await asyncio.gather(recv_msg(websocket),send_msg(websocket))


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(echo_loop())
    asyncio.get_event_loop().run_forever()
nunodsousa
  • 2,635
  • 4
  • 27
  • 49
0

It seems that there is no clear solution. In particular, there have been many changes in python since the early releases of asyncio, so many possible solutions are outdated.

I change the code to use WebSockets. However, the problem persists: input blocks the code, and none of the solutions above have solved my problem.

Below is the new version of the code (and the error remains):

Server

import asyncio
import websockets
import time

class Timer(object):
    '''Simple timer class that can be started and stopped.'''

    def __init__(self, websocket, name=None, interval=1) -> None:
        self.websocket = websocket
        self.name = name
        self.interval = interval

    async def _tick(self) -> None:
        while True:
            await asyncio.sleep(self.interval)
            await self.websocket.send("tick")
            print("Delta time: ", time.time() - self._init_time)

    async def start(self) -> None:
        self._init_time = time.time()
        self.task = asyncio.create_task(self._tick())

    async def stop(self) -> None:
        self.task.cancel()
        print("Delta time: ", time.time() - self._init_time)

async def handler(websocket):
    print("[WS-SERVER] client connected")
    while True:
        try:
            msg = await websocket.recv()
            print(f"<: {msg}")
            await websocket.send("Message received. {}".format(msg))
            if(msg == "start"):
                timer = Timer(websocket)
                await timer.start()

        except websockets.ConnectionClosed:
            print("[WS-SERVER] client disconnected")
            break

async def main():
    async with websockets.serve(handler, "localhost", 8765):
        print("[WS-SERVER] ready")
        await asyncio.Future()  # run forever

if __name__ == "__main__":
    asyncio.run(main())

Client

import asyncio
import websockets


'''async function that recieves and prints messages from the server'''
async def recieve_message(websocket):
    msg1 = await websocket.recv()
    print(f"<: {msg1}")

async def send_message(websocket):
    msg = input("Put your message here: ")
    await websocket.send(msg)
    print(":> Sent message: ", msg)

async def handler():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:

        while True:
            '''run input() in a separate thread'''

            recv_msg, send_msg = await asyncio.gather(
                recieve_message(websocket),
                send_message(websocket),
                return_exceptions=True)

            if(send_msg == "test"):
                print("Supertest")


async def main():
    await handler()
    await asyncio.Future()  # run forever

if __name__ == "__main__":
    asyncio.run(handler())
    print("[WS-CLIENT] bye")
nunodsousa
  • 2,635
  • 4
  • 27
  • 49
  • 1
    You're not running input() in a separate thread, you're running it in the same thread in the same asyncio loop, but in a different coroutine. Running threads requires using the python threading module https://docs.python.org/3/library/threading.html. I'm sure there are other posts that would be relevant. You could try looking here https://stackoverflow.com/questions/32889527/is-there-a-way-to-use-asyncio-queue-in-multiple-threads – dirck Nov 18 '22 at 06:05