1

I am communicating with a server using asyncio and have timeouts if the server does not respond (in the order of 2secs). Sometimes I want to log this data to file. I find the file IO (say, 6-7MB/s) can cause timeouts in my comms with the server. In general: if I run it on a Windows laptop, I don't have much of an issue. But on my RPi4 (which has slow file IO), it is a constant issue. I tried various things and eventually thought to move the file IO to a completely different process so the 'main' part is unaffected by the file IO work. However, whilst better, it still causes timeouts occasionally.

Below are snippets of my code...what is strange is if I comment out log_file.write(output) (meaning, everything other than the file write is still processed), everything works fine (no timeouts seen). So, somehow, my file IO is still coupled to my 'main' process.

What am I misunderstanding or doing wrong here? Why is the file write still affecting my 'main' process?

# Setup a pipe and the listener (multiprocessing.Process)

def file_logger_mp(....): # My setup function
    conn_rec, conn_send = multiprocessing.Pipe(duplex=False) # I believe duplex=False reduces overhead
    listener = LogListenerProcess(conn_rec, log_file_pth, append)
    log_hdl = LogHandler(conn_send, listener)

    listener.start()
    return log_hdl.send # This is called as the `callback_fn()` below
# Within LogHandler() class
class LogHandler():
    def __init__(self, conn_send: Connection, listener: LogListenerProcess) -> None:     
        self.conn_send = conn_send
        self.listener = listener

    def send(self):
        self.conn_send.send(msg) # Simply put the data onto the pipe
# This writes to file
class LogListenerProcess(multiprocessing.Process):
    def __init__(self, pipe_conn, log_file_path: str, append: bool = True):
       
        multiprocessing.Process.__init__(self)
        self.exit = multiprocessing.Event()
        self.pipe_conn = pipe_conn
        self.log_file_path = log_file_path
        self.buffered_console_stdout = None
        self.pipe_data = None
        self.append = append
        self.app_only_to_console = app_only_to_console

    def run(self):
        if self.append:
            log_file = open(self.log_file_path, "a")
        else:
            log_file = open(self.log_file_path, "w+")

        # Continue until told to exit and while there is data present in the pipe
        while not self.exit.is_set() or self.pipe_conn.poll():
            try:
                self.pipe_data = self.pipe_conn.recv()

                if self.pipe_data is None:  # Send None to force a break. Currently unused but here in case needed.
                    print("Closing")
                    break

                for record in self.pipe_data:
                    if isinstance(record, list):
                        output = "\n".join(map(str, record))+"\n"
                    else:
                        output = str(record)+'\n'
                    log_file.write(output)
                    
                # Without this, seems system will hold the data until program closes
                # https://stackoverflow.com/a/9824894
                log_file.flush()
                self.pipe_data.clear()

            except Exception:
                # Snip

# Within an asyncio function, call the hdl to push data into the pipe
for callback_fn in self.cbs:
    callback_fn(msg) # This calls log_hdl.send()

FiddleStix
  • 3,016
  • 20
  • 21
SimpleOne
  • 1,066
  • 3
  • 12
  • 29
  • 1
    This would be much easier to debug if you made an MRE. From what is visible here, I would have to guess that the `log_hdl.send()` call is blocking. See, for example, [this](https://stackoverflow.com/a/44924898/3324095) answer. You could see whether using a queue or increasing the OS pipe memory buffer size makes a difference. – FiddleStix Jun 21 '23 at 10:48
  • 1
    @FiddleStix you were spot on...changing to Queues worked. I think this answer summarises my issue well: https://stackoverflow.com/a/75715033. I think what is happening is that while the Process is busy writing to file, the `recv()` is not being serviced and so the `send()` is essentially blocked (maybe Linux has smaller pipe size compared to Windows). – SimpleOne Jun 21 '23 at 11:13

0 Answers0