I apologize if this is a repeat question: I have looked and haven't found any that would satisfy my question. I have a python script that allows my computer to connect to a piece of hardware using a static IP address and port. This piece of hardware only allows one connection at a time on this port. My first issue is that asyncio.open_connection() returns a successful connection status even if there is already another "user" connected to the device. When a true connection happens, the hardware sends a connection status message which, in my case, I do not receive until after the other "user" disconnects. While annoying, I can work around this issue by waiting for the status update message after "connecting" before allowing my script to proceed.
My bigger issue is that I do not have a way of knowing when my physical connection has been removed. For instance, I am connected to the hardware using a USB connection. The hardware requires that I send a keep alive message every 5 seconds but it does not send a response to the keep alive messages. If I pull the USB cable out of the device I would expect to receive errors when writing the keep alive message but I do not.
My script involves multiple concurrent asyncio tasks, but this simplified example should suffice. I would expect to receive an error when calling self.writer.write() or self.writer.drain() after I yank out the USB cable but I receive no indication of any change in the connection. My code just eats it and continues to send keep alive messages. What am I missing?
import asyncio
import logging
from typing import TypeVar
logger = logging.getLogger(__name__)
host = '169.254.13.95'
port = 51717
timeout_sec = 10
lock = asyncio.Lock()
# if using 3.11 or greater this line is not needed
Self = TypeVar("Self", bound="Foo")
class TcpConnection:
"""A sample TCP connection class to demonstrate my point"""
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
self.reader: asyncio.StreamReader = reader
self.writer: asyncio.StreamWriter = writer
@classmethod
async def connect(cls, host: str, port: int) -> Self | None:
connection = None
logger.info(f'Connecting to {host}:{port}')
try:
reader, writer = await asyncio.open_connection(host=host, port=port)
logger.info('Connected')
connection = TcpConnection(reader, writer)
except ConnectionRefusedError:
logger.info(f'Connect call refused ({host}:{port})')
except OSError:
logger.info(f'Connect call failed ({host}:{port})')
except Exception as e:
logger.warning(f'Unknown exception caught:\n{e}')
finally:
return connection
def is_connected(self) -> bool:
return self.writer.is_closing() == False
async def keep_alive(self) -> None:
logger.info('Starting keep alive task')
keep_alive_msg = b'\x00'
while self.is_connected():
async with lock:
self.writer.write(keep_alive_msg)
await self.writer.drain()
logger.debug('Sent keep alive message')
await asyncio.sleep(4.5) # don't wait the full 5 seconds just in case
logger.info('Terminating keep alive task')
async def main() -> None:
while 1:
tcp = await TcpConnection.connect(host, port)
if tcp and tcp.is_connected():
try:
# create a task to run the keep alive message
keep_alive_task = asyncio.create_task(tcp.keep_alive())
await keep_alive_task
except ConnectionError:
logger.info('Client disconnected')
logger.info(f'Waiting {timeout_sec} seconds before trying to reconnect')
await asyncio.sleep(timeout_sec)
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG)
try:
logger.info('Starting application')
asyncio.run(main())
except KeyboardInterrupt:
logger.info('Exiting application')