4

I've got two Python programs running. Program A connects to program B with the multiprocessing module:

# Connection code in program A
# -----------------------------
import multiprocessing
import multiprocessing.connection

...

connection = multiprocessing.connection.Client(
('localhost', 19191),                # <- address of program B
authkey='embeetle'.encode('utf-8')   # <- authorization key
)

...

connection.send(send_data)

recv_data = connection.recv()

It works perfectly most of the time. However, sometimes program B is frozen (the details don't matter much, but it usually happens when the GUI from program B spawns a modal window).
While program B is frozen, program A hangs at the following line:

connection = multiprocessing.connection.Client(
('localhost', 19191),                # <- address of program B
authkey='embeetle'.encode('utf-8')   # <- authorization key
)

It keeps waiting for a response. I would like to put a timeout parameter, but the call to multiprocessing.connection.Client(..) does not have one.

How can I implement a timeout here?

 
Notes:
I'm working on a Windows 10 computer with Python 3.7.

K.Mulier
  • 8,069
  • 15
  • 79
  • 141
  • 1
    [`.Client` is a function](https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/connection.py#L483). Answers to [Timeout on a function call](https://stackoverflow.com/questions/492519/timeout-on-a-function-call) delineate a number of methods (there are other similar SO Q&A's reachable with a search). Many methods involve decorators; if one is useful, you could wrap/redefine `.Client` with it - `Client = timeout_decorator(...Client)` and use the wrapped function. – wwii Sep 06 '19 at 16:16
  • 1
    If you think that your question is a duplicate to [Timeout on a function call](https://stackoverflow.com/questions/492519/timeout-on-a-function-call) please comment so we can mark it as such. – wwii Sep 06 '19 at 17:08
  • Hi @wwii , the **Timeout on a function call** is indeed a very good approach to this! Thanks a lot. However, I noticed the accepted answer at that question is for UNIX only. What do you consider to be the best answer for Windows users? – K.Mulier Sep 08 '19 at 09:11
  • caveat: I have no practical experience with this other than playing with stuff I find here on SO. I think I would try using this answer - https://stackoverflow.com/a/31667005/2823755. It uses `threading.Timer` to raise `KeyboardInterrupt` that could be caught. It would be nice to have a minimal working example to play around with: something that mimics your Programs A and B that try to connect with each other and nothing much else - to make sure that doing this with `multiprocessing.connection.Client` doesn't cause any unwanted side effects. – wwii Sep 08 '19 at 16:54
  • 1
    Specifically - https://stackoverflow.com/questions/34871191/cant-close-socket-on-keyboardinterrupt-python, https://stackoverflow.com/questions/47847392/keyboard-interrupt-sockets-and-threads. Might need something more elaborate using https://stackoverflow.com/a/31667005/2823755 as a starting point. – wwii Sep 08 '19 at 17:30
  • can you add also the code for program B ? – kederrac Sep 14 '19 at 08:49

1 Answers1

9

I would like to put a timeout parameter, but the call to multiprocessing.connection.Client(..) does not have one. How can I implement a timeout here?

Looking at the source to multiprocessing.connection in Python 3.7, the Client() function is a fairly brief wrapper around SocketClient() for your use case, which in turn wraps Connection().

At first it looked fairly straightforward to write a ClientWithTimeout wrapper that does the same thing, but additionally calls settimeout() on the socket it creates for the connection. However, this does not have the correct effect, because:

  1. Python implements its own socket timeout behaviour by using select() and an underlying non-blocking OS socket; this behaviour is what is configured by settimeout().

  2. Connection operates directly on an OS socket handle, which is returned by calling detach() on the normal Python socket object.

  3. Since Python has set the OS socket handle to the non-blocking mode, recv() calls on it return immediately rather than waiting for the timeout period.

However, we can still set a receive timeout on the underlying OS socket handle by using the low-level SO_RCVTIMEO socket option.

Hence the second version of my solution:

from multiprocessing.connection import Connection, answer_challenge, deliver_challenge
import socket, struct

def ClientWithTimeout(address, authkey, timeout):

    with socket.socket(socket.AF_INET) as s:
        s.setblocking(True)
        s.connect(address)

        # We'd like to call s.settimeout(timeout) here, but that won't work.

        # Instead, prepare a C "struct timeval" to specify timeout. Note that
        # these field sizes may differ by platform.
        seconds = int(timeout)
        microseconds = int((timeout - seconds) * 1e6)
        timeval = struct.pack("@LL", seconds, microseconds)

        # And then set the SO_RCVTIMEO (receive timeout) option with this.
        s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeval)

        # Now create the connection as normal.
        c = Connection(s.detach())

    # The following code will now fail if a socket timeout occurs.

    answer_challenge(c, authkey)
    deliver_challenge(c, authkey)

    return c

For brevity, I have assumed the parameters are as per your example, i.e.:

  • address is a tuple (implying address family is AF_INET).
  • authkey is a byte string.

If you need to handle cases where these assumptions don't hold then you will need to copy a little more logic from Client() and SocketClient().

Although I looked at the multiprocessing.connection source to find out how to do this, my solution does not use any private implementation details. Connection, answer_challenge and deliver_challenge are all public and documented parts of the API. This function should therefore be be safe to use with future versions of multiprocessing.connection.

Note that SO_RCVTIMEO may not be supported on all platforms, but it is present on at least Windows, Linux and OSX. The format of struct timeval is also platform-specific. I have assumed that the two fields are always of the native unsigned long type. I think this should be correct on common platforms but it is not guaranteed to always be so. Unfortunately Python does not currently provide a platform-independent way to do this.

Below is a test program which shows this working - it assumes the above code is saved as client_timeout.py.

from multiprocessing.connection import Client, Listener
from client_timeout import ClientWithTimeout
from threading import Thread
from time import time, sleep

addr = ('localhost', 19191)
key = 'embeetle'.encode('utf-8')

# Provide a listener which either does or doesn't accept connections.
class ListenerThread(Thread):

    def __init__(self, accept):
        Thread.__init__(self)
        self.accept = accept

    def __enter__(self):
        if self.accept:
            print("Starting listener, accepting connections")
        else:
            print("Starting listener, not accepting connections")
        self.active = True 
        self.start()
        sleep(0.1)

    def run(self):
        listener = Listener(addr, authkey=key)
        self.active = True
        if self.accept:
            listener.accept()
        while self.active:
            sleep(0.1)
        listener.close()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.active = False
        self.join()
        print("Stopped listener")
        return True

for description, accept, name, function in [
        ("ClientWithTimeout succeeds when the listener accepts connections.",
        True, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)),
        ("ClientWithTimeout fails after 3s when listener doesn't accept connections.",
        False, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)),
        ("Client succeeds when the listener accepts connections.",
        True, "Client", lambda: Client(addr, authkey=key)),
        ("Client hangs when the listener doesn't accept connections (use ctrl-C to stop).",
        False, "Client", lambda: Client(addr, authkey=key))]:

    print("Expected result:", description)

    with ListenerThread(accept):
        start_time = time()
        try:
            print("Creating connection using %s... " % name)
            client = function()
            print("Client created:", client)
        except Exception as e:
            print("Failed:", e)
        print("Time elapsed: %f seconds" % (time() - start_time))

    print()

Running this on Linux produces the following output:

Expected result: ClientWithTimeout succeeds when the listener accepts connections.
Starting listener, accepting connections
Creating connection using ClientWithTimeout... 
Client created: <multiprocessing.connection.Connection object at 0x7fad536884e0>
Time elapsed: 0.003276 seconds
Stopped listener

Expected result: ClientWithTimeout fails after 3s when listener doesn't accept connections.
Starting listener, not accepting connections
Creating connection using ClientWithTimeout... 
Failed: [Errno 11] Resource temporarily unavailable
Time elapsed: 3.157268 seconds
Stopped listener

Expected result: Client succeeds when the listener accepts connections.
Starting listener, accepting connections
Creating connection using Client... 
Client created: <multiprocessing.connection.Connection object at 0x7fad53688c50>
Time elapsed: 0.001957 seconds
Stopped listener

Expected result: Client hangs when the listener doesn't accept connections (use ctrl-C to stop).
Starting listener, not accepting connections
Creating connection using Client... 
^C
Stopped listener
Martin L
  • 1,016
  • 9
  • 12
  • 1
    Did you test it? If possible and not too lengthy could you add the code for testing it? – wwii Sep 13 '19 at 13:50
  • Excellent, thnx for the extra. Hopefully OP will be able to use it. – wwii Sep 14 '19 at 14:08
  • Thanks, this helped loads! I tweaked this a little and 1) added a send timeout (as this could block just as well), and 2) set both options before `s.connect` (as this is the blocking function when sending). You can set the send timeout similarly: `s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, timeval)` – Max Sep 09 '21 at 08:20