0

I apologize if this is a repeat question: I have looked and haven't found any that would satisfy my question. I have a python script that allows my computer to connect to a piece of hardware using a static IP address and port. This piece of hardware only allows one connection at a time on this port. My first issue is that asyncio.open_connection() returns a successful connection status even if there is already another "user" connected to the device. When a true connection happens, the hardware sends a connection status message which, in my case, I do not receive until after the other "user" disconnects. While annoying, I can work around this issue by waiting for the status update message after "connecting" before allowing my script to proceed.

My bigger issue is that I do not have a way of knowing when my physical connection has been removed. For instance, I am connected to the hardware using a USB connection. The hardware requires that I send a keep alive message every 5 seconds but it does not send a response to the keep alive messages. If I pull the USB cable out of the device I would expect to receive errors when writing the keep alive message but I do not.

My script involves multiple concurrent asyncio tasks, but this simplified example should suffice. I would expect to receive an error when calling self.writer.write() or self.writer.drain() after I yank out the USB cable but I receive no indication of any change in the connection. My code just eats it and continues to send keep alive messages. What am I missing?

import asyncio
import logging
from typing import TypeVar

logger = logging.getLogger(__name__)
host = '169.254.13.95'
port = 51717
timeout_sec = 10
lock = asyncio.Lock()

# if using 3.11 or greater this line is not needed
Self = TypeVar("Self", bound="Foo")


class TcpConnection:
    """A sample TCP connection class to demonstrate my point"""

    def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
        self.reader: asyncio.StreamReader = reader
        self.writer: asyncio.StreamWriter = writer

    @classmethod
    async def connect(cls, host: str, port: int) -> Self | None:
        connection = None
        logger.info(f'Connecting to {host}:{port}')
        try:
            reader, writer = await asyncio.open_connection(host=host, port=port)
            logger.info('Connected')
            connection = TcpConnection(reader, writer)
        except ConnectionRefusedError:
            logger.info(f'Connect call refused ({host}:{port})')
        except OSError:
            logger.info(f'Connect call failed ({host}:{port})')
        except Exception as e:
            logger.warning(f'Unknown exception caught:\n{e}')
        finally:
            return connection

    def is_connected(self) -> bool:
        return self.writer.is_closing() == False

    async def keep_alive(self) -> None:
        logger.info('Starting keep alive task')
        keep_alive_msg = b'\x00'
        while self.is_connected():
            async with lock:
                self.writer.write(keep_alive_msg)
                await self.writer.drain()
                logger.debug('Sent keep alive message')
            await asyncio.sleep(4.5)  # don't wait the full 5 seconds just in case
        logger.info('Terminating keep alive task')


async def main() -> None:
    while 1:
        tcp = await TcpConnection.connect(host, port)

        if tcp and tcp.is_connected():
            try:
                # create a task to run the keep alive message
                keep_alive_task = asyncio.create_task(tcp.keep_alive())
                await keep_alive_task
            except ConnectionError:
                logger.info('Client disconnected')
        logger.info(f'Waiting {timeout_sec} seconds before trying to reconnect')
        await asyncio.sleep(timeout_sec)


if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S',
                        level=logging.DEBUG)
    try:
        logger.info('Starting application')
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info('Exiting application')
  • Try again. This is definitely not your program. You never call `keep_alive` nor create it as a Task, and you use the await keyword outside of an async def function. Please test your code and proofread your question before posting it. – Paul Cornelius May 31 '23 at 02:36
  • Thanks @PaulCornelius. In my hast to get something submitted whilst keeping it vague I submitted broken code. I have update the code. – Steven Krick May 31 '23 at 13:47

1 Answers1

1

My first issue is that asyncio.open_connection() returns a successful connection status even if there is already another "user" connected to the device.

Establishing a connection is done inside the OS kernel and the kernel can do this for many connections in parallel, even if the user space application handles only one connection at a time. There is no way around it.

The hardware requires that I send a keep alive message every 5 seconds but it does not send a response to the keep alive messages.

TCP is about reliability. It will try to retransmit the data and this retransmission attempts will only time out after a while. It will not immediately react to a broken link since it might not even notice or hope that the link gets re-established in time so that the data can get successfully retransmitted.

If you want immediate notice then the peer would need to send some feedback that it received your data and you could react if you don't get this feedback. But this is not how keep alive seems to be designed in your case - it is just about keeping the connection alive (i.e. no state closing in firewalls because of idle connections) and not about immediately detecting broken links.

would expect to receive an error when calling self.writer.write()

Write just delivers the data to the local socket buffer. It can thus not provide any information if something went wrong when delivering the data. It will return an error if the socket was marked as broken when resubmissions of the previous data has ultimately failed, but this will take some time after the original data got written to the socket.

Steffen Ullrich
  • 114,247
  • 10
  • 131
  • 172
  • Thank you, @SteffenUllrich. That is what I was afraid of. I tested your theory and it took 15 minutes for the underlying socket to detect the disconnect. I've written similar code in Java which responds almost instantly when there is a disconnect. I'm still not sure I understand why Python takes so long. – Steven Krick May 31 '23 at 14:12
  • 1
    @StevenKrick: have a look at [TCP keepalive](https://stackoverflow.com/questions/12248132/how-to-change-tcp-keepalive-timer-using-python-script). With the right timing (and OS support) it should detect it faster. But I doubt the "almost instantly" with Java, at least for applications which don't implement their own heartbeat. – Steffen Ullrich May 31 '23 at 15:55
  • I am marking this complete as the answers most closely align to my issue. I really struggled to get the low level socket library to detect a disconnect with any "urgency" so instead I am writing a command over TCP that I know elicits a response and treating it as my keep alive message. The response is only 8 bytes so hardly a bandwidth killer. – Steven Krick May 31 '23 at 21:17