15

I have simple UDPServer, which works with multiprocessing.

I want to create a list, that contains information about all clients.

I use Manager, but I don't understand, how to append information in list - I need transfer Manager`s object to handle, but how? My way with new attribute does not work.

import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections

class ChatHandler(DatagramRequestHandler):

    def handle(self):
        cur_process = multiprocessing.current_process()
        data = self.request[0].strip()
        socket = self.request[1]
        ChatHandler.clients.append(self.client_address) # error here
        print(ChatHandler.clients)


class ChatServer(ForkingMixIn, UDPServer):
    pass


if __name__ == '__main__':
    server = ChatServer((host, port), ChatHandler)
    ChatHandler.clients = multiprocessing.Manager().list()
    server_process = multiprocessing.Process(target=server.serve_forever)
    server_process.daemon = False
    server_process.start()

How to fix that? Thanks!

Output:

Exception happened during processing of request from ('127.0.0.1', 55679)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 724, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 584, in process_request
    self.finish_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 344, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 665, in __init__
    self.handle()
  File "server.py", line 15, in handle
    ChatHandler.clients.append(self.client_address)
  File "<string>", line 2, in append
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 728, in _callmethod
    self._connect()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 715, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 495, in Client
    c = SocketClient(address)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 624, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
Leon
  • 6,316
  • 19
  • 62
  • 97

5 Answers5

21

The problem is that you're letting the main process finish its execution immediately after you start the worker process. When the process that created the multiprocessing.Manager finishes its execution, the Manager server gets shut down, which means your shared list object is now useless. This happens because the Manager object registers it's shutdown function as a "finalizer" with the multiprocessing module, which means it will be run just before the process exits. Here's the code that registers it, in BaseManager.__init__:

    # register a finalizer
    self._state.value = State.STARTED
    self.shutdown = util.Finalize(
        self, type(self)._finalize_manager,
        args=(self._process, self._address, self._authkey,
              self._state, self._Client),
        exitpriority=0
        )

Here's the code that actually does the shut down:

@staticmethod
def _finalize_manager(process, address, authkey, state, _Client):
    '''
    Shutdown the manager process; will be registered as a finalizer
    '''
    if process.is_alive():
        util.info('sending shutdown message to manager')
        try:
            conn = _Client(address, authkey=authkey)
            try:
                dispatch(conn, None, 'shutdown')
            finally:
                conn.close()
        except Exception:
            pass

        process.join(timeout=1.0)
        if process.is_alive():
            util.info('manager still alive')
            if hasattr(process, 'terminate'):
                util.info('trying to `terminate()` manager process')
                process.terminate()
                process.join(timeout=0.1)
                if process.is_alive():
                    util.info('manager still alive after terminate')

    state.value = State.SHUTDOWN
    try:
        del BaseProxy._address_to_local[address]
    except KeyError:
        pass

The fix is simple - don't let the main process complete immediately you start the process that runs the UDP server, by calling server_process.join():

import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections

class ChatHandler(DatagramRequestHandler):

    def handle(self):
        cur_process = multiprocessing.current_process()
        data = self.request[0].strip()
        socket = self.request[1]
        ChatHandler.clients.append(self.client_address) # error here
        print(ChatHandler.clients)


class ChatServer(ForkingMixIn, UDPServer):
    pass


if __name__ == '__main__':
    server = ChatServer((host, port), ChatHandler)
    ChatHandler.clients = multiprocessing.Manager().list()
    server_process = multiprocessing.Process(target=server.serve_forever)
    server_process.daemon = False
    server_process.start()
    server_process.join() # This fixes the issue.
dano
  • 91,354
  • 19
  • 222
  • 219
  • I have same issue, but i`m joining children already. My code works on my pc, but fails on another. – eri Dec 07 '17 at 10:41
  • I don't get it. I thought that non-daemon processes were joined implicitly at the end of their parent process so there was no need for your last line. – Géry Ogam Jun 18 '19 at 08:06
  • 1
    @Maggyero that's true, but the Manager registers a handler that makes it start shutting down when the main process reaches the end of its execution. So while the main process will not exit until the Manager process does, the Manager process will immediately start shutting down, so anything that depends on it will break. – dano Jun 18 '19 at 11:45
  • @dano I am facing a weird issue. I always `join()` my processes but I keep getting this _connection_ error when trying to manipulate a shared list. Why is that? Even more weirder, I even sometimes get the same error when I am trying to initialize a manager object by doing `manager=Manager()` before I run the processes that manipulates the shared list. – Amir Aug 15 '19 at 17:03
  • @dano Just to clarify, the error I get when initializing the Manager object is not the same as the connection error described above. It's an EOFError which is very weird ... . I opened a separate for this issue somewhere [here](https://stackoverflow.com/questions/57517272/initializing-a-multiprocessing-manager-object-gives-eoferror-at-random-times). Would appreciate if you can take a look at it. – Amir Aug 15 '19 at 23:23
0

The following shows an example of a UDP server and a shared list.

  • parent code creates a Manager, a managed list, and passed it to start_server()

  • this function in turn actually starts the server, storing the shared list such that the server -- and its handler -- can access it

  • when a packet arrives, the handle() method is triggered. This accesses the server using self.server, and the shared list with self.server.client_list, an attribute on the ChatServer instance.

I did testing by starting the server, waiting a second, then sending a UDP packet "beer" using the netcat command. For some reason it sends Xs first, and each output is duplicated. This is a bug, but the code should point you in the right direction.

source

import multiprocessing as mp, signal, sys
from SocketServer import (
    UDPServer, ForkingMixIn, DatagramRequestHandler
)

class ChatHandler(DatagramRequestHandler):
    def handle(self):
        data,_socket = self.request
        curproc = mp.current_process()
        print '{}: {}'.format(
            curproc,
            dict(
                data_len=len(data), 
                data=data.strip(),
                client=self.client_address,
            ))
        self.server.client_list.append(
            self.client_address)
        print('{}: {}'.format(
            curproc,
            dict(client_list=self.server.client_list),
        ))

class ChatServer(ForkingMixIn, UDPServer):
    client_list = None

def start_server(client_list):
    server = ChatServer(('', 9876), ChatHandler)
    server.client_list = client_list
    server.serve_forever()

if __name__ == '__main__':
    clist = mp.Manager().list()
    mp.Process(
        target=start_server, args=[clist],
        name='udpserver',
    ).start()

    signal.alarm(5)             # die in 5 seconds
    signal.pause()              # wait for control-C or alarm

test run

(sleep 1 ; echo beer | nc -vvu localhost 9876 ) &
python ./mshared.py

<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
Connection to localhost 9876 port [udp/*] succeeded!
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'beer', 'client': ('127.0.0.1', 49399), 'data_len': 5}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
johntellsall
  • 14,394
  • 4
  • 46
  • 40
  • This code doesn't work for me in Python 2 or Python 3. I get the same errors as the OP. What platform are you running this one? – dano Aug 22 '14 at 21:48
  • Python 2.7.5+ on Linux Mint 16 Petra. – johntellsall Aug 22 '14 at 21:53
  • Ah, I had removed the last two `signal` lines from your code when I ran it, which exposed same the root issue that the OP's example has. – dano Aug 22 '14 at 22:35
  • @dano do you think explicitly `join()`ing the processes is better than just waiting for a signal with `signal.pause()`? I thought that if procs are started, then the latter is a brief version of the former, but I haven't traced the code. – johntellsall Aug 23 '14 at 12:43
  • 1
    I'd favor `proc.join()` for two reasons: 1) `signal.pause()` isn't available on Windows 2) Should `proc` either exit normally or exit due to some failure, the main process will also exit. With `signal.pause()`, the main process will continue to run until it's explicitly killed via a signal or a Ctrl+C, even if the child has exited. – dano Aug 23 '14 at 18:40
  • Both good reasons -- thanks @dano. I've posted this on my blog and thanked you for the insights! http://johntellsall.blogspot.com/2014/08/best-practices-with-pythons.html – johntellsall Aug 23 '14 at 19:45
0

if you're using it anyway like the following way you might require to look at the length of list you're passing or hardcoded count of workers it might be exceeding your machine's capability:

        pool = Pool(len(somelist))
        # call the function 'somefunction' in parallel for each somelist.
        pool.map(somefunction, somelist)

i reduced the workers it resolved the issue for me.

Devopsception
  • 397
  • 1
  • 4
  • 10
0

If you can't use the manager for whatever reason, you can also implement one on your own that fits your needs.

My unittest was configured to stop all child processes that are left over if they are not properly shut down as expected, which destroyed the manager. So I needed something that can be arbitrarily started and stopped without bothering the tests.

import multiprocessing
import atexit
import select

class SharedDict:
    """Share a dictionary across processes."""
    def __init__(self):
        """Create a shared dictionary."""
        super().__init__()
        self.pipe = multiprocessing.Pipe()
        self.process = None
        atexit.register(self._stop)
        self._start()

    def _start(self):
        """Ensure the process to manage the dictionary is running."""
        if self.process is not None and self.process.is_alive():
            return

        # if the manager has already been running in the past but stopped
        # for some reason, the dictionary contents are lost
        self.process = multiprocessing.Process(target=self.manage)
        self.process.start()

    def manage(self):
        """Manage the dictionary, handle read and write requests."""
        shared_dict = dict()
        while True:
            message = self.pipe[0].recv()
            logger.spam('SharedDict got %s', message)

            if message[0] == 'stop':
                return

            if message[0] == 'set':
                shared_dict[message[1]] = message[2]

            if message[0] == 'get':
                self.pipe[0].send(shared_dict.get(message[1]))

    def _stop(self):
        """Stop the managing process."""
        self.pipe[1].send(('stop',))

    def get(self, key):
        """Get a value from the dictionary."""
        return self.__getitem__(key)

    def __setitem__(self, key, value):
        self.pipe[1].send(('set', key, value))

    def __getitem__(self, key):
        self.pipe[1].send(('get', key))

        # to avoid blocking forever if something goes wrong
        select.select([self.pipe[1]], [], [], 0.1)
        if self.pipe[1].poll():
            return self.pipe[1].recv()

        return None

    def __del__(self):
        self._stop()


shared_dict = SharedDict()

You can extend this with all sorts of methods, and you can stop and restart it whenever you like (though the dict will be lost each time). The pipes will remain the same all the time, so all child processes can also talk to the restarted manager without the need for new pipe fds.

I might extend this stuff with more functionality. If I didn't move that class into its own module in the meantime, it can be found at https://github.com/sezanzeb/key-mapper/blob/main/keymapper/injection/macros.py

sezanzeb
  • 816
  • 8
  • 20
0

You can use Python native library multiprocessing.SharedMemory

Or this:

import multiprocessing
manager = multiprocessing.Manager()
shared_list = manager.list()

def worker1(l):
    l.append(1)

def worker2(l):
    l.append(2)

process1 = multiprocessing.Process(
    target=worker1, args=[shared_list])
process2 = multiprocessing.Process(
    target=worker2, args=[shared_list])

process1.start()
process2.start()
process1.join()
process2.join()

print shared_list