1

I have a Python script that 'taps' a serial link between two devices.

It opens two serial ports and writes data arriving on each port back out to the other port.

The serial stream has a protocol which the script decodes. So, for example, byte 0x01 might mean 'COMMAND_1', which the script knows needs 4 more bytes to form the packet. 0x02 might mean the next 6 bytes form a complete packet.

So, my 'decode' routine successfully:

  1. Identifies when a packet-start byte arrives
  2. Decides how many further bytes are needed to form the packet
  3. Maintains a list of 'in-flight' bytes to build up the packet
  4. When a full packet is received, the list is cleared and the routine repeats.

Note that all received bytes are copied to the other serial port.

So what I'm seeing is that my routine is causing lag - when bytes arrive quickly, then stop, it takes a few seconds for the bytes to go through the decode process and be output to the other port.

After a bit of debugging, I've found it's resetting the inflight_bytes list that's very slow. I've tried simply assigning a new list (inflight_bytes = []) and clearing the existing list (inflight_bytes.clear()), but both cause the delay.

Here's the routine in question, since I know it's better to show than tell :)

class StreamParser(object):
""" 
    Extracts IKBD packets from sequences of bytes.
"""
    def __init__(self, packet_defs):
        """ Description
        :type packet_defs:  A class which implements get_struct_class, which returns a class
                            appropriate for the packet which a byte signifies

        :param packet_defs: A packet class factory

        :raises:

        :rtype:             Returns a tuple which is either:
                            original_byte, None         (if no packet has been decoded) or,
                            original_byte, packet_cls   (if a packet has been decoded)
        """    
        self.pending_packet_class = None
        self.inflight_bytes = []
        self.packet_length = 0
        self.packet_defs = packet_defs

    def parse_byte(self, b):
        """ Collects bytes into packets.

            - Checks if a byte signifies the start of a packet
            - If so, gets the class which represents the indicated packet
            - Gets the number of additional bytes the class wants to complete the packet
            - For further calls, collect enough bytes to populate the packet class
              (in the 'inflight_bytes' list)
            - When a packet has been completed, instantiates the packet class and sets
              its fields, then resets state (clears inflight_bytes, pending_packet_class
              and packet_length)

            Returns:

            A tuple of either:

            byte, packet    - when a complete packet has been built
            byte, None      - when a complete packet is yet to be ready

            ...byte is always the byte which arrived, so it can be transparently
            passed through to the other port. 
        """


        is_new_packet = False

        if not self.pending_packet_class:
            # no in-flight packet - assume 'b' as the start of a new packet

            # get the class which represents this packet.  
            # packet_defs.get_struct_class returns a cls, not an instance of the class
            self.pending_packet_class = self.packet_defs.get_struct_class(b)

            if not self.pending_packet_class:
                # this byte doesn't match a known start-of-packet - just return the byte
                return b, None
            else:

                # remember how many further bytes we need to complete the packet,
                # and stash the byte in our inflight_bytes list
                self.packet_length = len(self.pending_packet_class._fields_)
                self.inflight_bytes.append(b)
                is_new_packet = True


        # check if we can use this byte to populate an in-flight packet's fields
        if (not is_new_packet) and (len(self.inflight_bytes) < self.packet_length):
            self.inflight_bytes.append(b)


        # do we have a complete packet?
        if (len(self.inflight_bytes) == self.packet_length): 
            # instantiate the packet class and populate its fields
            packet = self.pending_packet_class()
            packet.values = self.inflight_bytes  

            # reset ourselves, since the next byte will be a 'start-of-packet'
            self.packet_length = 0
            self.pending_packet_class = None

            # ** This line causes a processing backlog! **
            # When omitted, packets are returned as quickly as they arrive
            # Can't figure out how to clear the list without introducing a delay...
            self.inflight_bytes.clear()

            # return the byte and the packet
            return b, packet


        # return the byte - no packet yet.
        return b, None

Edit to add - the rate for this is a rather pedestrian 7812 baud, and all the blocking stuff elsewhere is implemented with asyncio, so I think everything else is going as quick as it can.

Further edit:

Below is a script which I'd hoped would illustrate the problem.

import asyncio
import serial_asyncio

class ByteParser(object):
    def __init__(self):
        self.inflight_bytes = []

    def parse_byte(self, b):
        if len(self.inflight_bytes) < 5:
            self.inflight_bytes.append(b)
            return b, None
        else:
            packet = self.inflight_bytes.copy()
            self.inflight_bytes = []
            return b, packet         

async def port_reader(port):
    while True:
        b = await port.read(1)
        yield b[0]

async def parse_byte(in_port, out_port):
    parser = ByteParser()
    async for b in port_reader(in_port):
        b, packet = parser.parse_byte(b)

        # Keep below commented when testing performance.  Printing
        # to the console slows us down
        #if packet:
        #    print("Got whole packet")

        out_port.write([b])

async def main():
    read_port, _ = await serial_asyncio.open_serial_connection(url='/dev/ttySC1', baudrate=7812.5)
    _, write_port = await serial_asyncio.open_serial_connection(url='/dev/ttySC0', baudrate=7812.5)

    asyncio.ensure_future(parse_byte(read_port, write_port))

loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_forever()

However, this does not introduce a delay in transmitting the output stream. So, it seems that clearing the inflight_bytes list isn't causing the delay itself.

My thinking now is that it's the cumulative time taken to execute the parse_byte routine, rather than any single part of it.

I'll go back through and see where I can shave some execution time.

displaced
  • 49
  • 3
  • `packet.values = self.inflight_bytes` doesn't make a copy, it just adds another reference to the list. The `self.inflight_bytes.clear()` call then clears the list, so `packet.values` now references an empty list. Is that intentional? – Martijn Pieters Mar 16 '19 at 13:09
  • In fact, the code you posted can't ever produce a `packet` object with anything but an empty `.values` list. I've duplicated you to the canonical post on how to clone a list for now, but there is no [MCVE] here for us to do anything other than close this post. – Martijn Pieters Mar 16 '19 at 13:15
  • Thanks Martijn - I'll put together a suitable example and update. Is it acceptable to edit the code in the existing post, or should I create another? – displaced Mar 16 '19 at 13:18
  • It's more than acceptable to edit your post when it was closed, it is explicitly encouraged even. However, your edit doesn't make your question any more answerable than it was before, sorry. We'd need something reproducible here. – Martijn Pieters Mar 16 '19 at 13:37
  • Thanks again, Martijn. It seems that I was chasing the wrong issue here. I've updated the question with some code which proves the assumption behind the question was incorrect. Thanks for taking the time to assist! – displaced Mar 16 '19 at 14:12

0 Answers0