3

I am trying to:

  1. share a dataframe between processes
  2. update a shared dict based on calculations performed on (but not changing) that dataframe

I am using a multiprocessing.Manager() to create a dict in shared memory (to store results) and a Namespace to store/share my dataframe that I want to read from.

import multiprocessing

import pandas as pd
import numpy as np


def add_empty_dfs_to_shared_dict(shared_dict, key):
    shared_dict[key] = pd.DataFrame()


def edit_df_in_shared_dict(shared_dict, namespace, ind):
    row_to_insert = namespace.df.loc[ind]
    df = shared_dict[ind]
    df[ind] = row_to_insert
    shared_dict[ind] = df


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    shared_dict = manager.dict()
    namespace = manager.Namespace()

    n = 100
    dataframe_to_be_shared = pd.DataFrame({
        'player_id': list(range(n)),
        'data': np.random.random(n),
    }).set_index('player_id')

    namespace.df = dataframe_to_be_shared

    for i in range(n):
        add_empty_dfs_to_shared_dict(shared_dict, i)

    jobs = []
    for i in range(n):
        p = multiprocessing.Process(
            target=edit_df_in_shared_dict,
            args=(shared_dict, namespace, i)
        )
        jobs.append(p)
        p.start()

    for p in jobs:
        p.join()

    print(shared_dict[1])

When running the above, it writes to shared_dict correctly as my print statement executes with some data. I also get an error regarding the manager:

Process Process-88:
Traceback (most recent call last):
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 788, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/henrysorsky/Library/Preferences/PyCharm2019.2/scratches/scratch_13.py", line 34, in edit_df_in_shared_dict
    row_to_insert = namespace.df.loc[ind]
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 1099, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 792, in _callmethod
    self._connect()
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 779, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 492, in Client
    c = SocketClient(address)
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 619, in SocketClient
    s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused

I understand this is coming from the manager and seems to be due to it not shutting down properly. The only similar issue I can find online:

Share list between process in python server

suggests joining all the child processes, which I am already doing.

statskyy
  • 307
  • 2
  • 4
  • 11
  • 1
    In case it's of help to anyone, I was getting the `AttributeError: 'ForkAwareLocal' object has no attribute 'connection'` error too. The problem for me was that I was creating and using two `multiprocessing.Manager` objects (in separate pytest fixtures). Once I changed the structure to share the same manager, the problem resolved itself. – amin_nejad Dec 02 '20 at 22:52

3 Answers3

4

So after a full nights sleep I realised it was actually the reading of the dataframe in shared memory that was causing issues and that at around the 20th child process, some of them were failing this read. I added a max number of processes to run at once and this solved it.

For anyone wondering, the code I used is:

import multiprocessing

import pandas as pd
import numpy as np

def add_empty_dfs_to_shared_dict(shared_dict, key):
    shared_dict[key] = pd.DataFrame()


def edit_df_in_shared_dict(shared_dict, namespace, ind):
    row_to_insert = namespace.df.loc[ind]
    df = shared_dict[ind]
    df[ind] = row_to_insert
    shared_dict[ind] = df


if __name__ == '__main__':
    # region define inputs

    max_jobs_running = 4
    n = 100

    # endregion

    manager = multiprocessing.Manager()
    shared_dict = manager.dict()
    namespace = manager.Namespace()

    dataframe_to_be_shared = pd.DataFrame({
        'player_id': list(range(n)),
        'data': np.random.random(n),
    }).set_index('player_id')

    namespace.df = dataframe_to_be_shared

    for i in range(n):
        add_empty_dfs_to_shared_dict(shared_dict, i)

    jobs = []
    jobs_running = 0
    for i in range(n):
        p = multiprocessing.Process(
            target=edit_df_in_shared_dict,
            args=(shared_dict, namespace, i)
        )
        jobs.append(p)
        p.start()

        jobs_running += 1

        if jobs_running >= max_jobs_running:
            while jobs_running >= max_jobs_running:
                jobs_running = 0
                for p in jobs:
                    jobs_running += p.is_alive()

    for p in jobs:
        p.join()

    for key, value in shared_dict.items():
        print(f"key: {key}")
        print(f"value: {value}")
        print("-" * 50)

This would probably be better handled by a Queue and Pool setup rather than my hacky fix.

statskyy
  • 307
  • 2
  • 4
  • 11
  • 3
    Why does lowering the process count fix this issue? I'd like to fix this issue while maintaining 8 parallel processes! I'm having the same Exception (AttributeError: 'ForkAwareLocal') when one of my eight processes is appending data to a multiprocessing.Manager().list() instance (that's passed in as an argument into the process) – Omrii Dec 25 '21 at 23:50
4

The problem is probably in your main process, which created the shared dict. If you forgot to use process.join() (or an infinite loop) in your main process, then the main process may finish before the other processes using the dict. This way the dict gets destroyed, and the processes cannot connect to it.

The number of processes should not be a problem. You should be able to use the dict with as many as you wish.

Daniel
  • 391
  • 4
  • 17
0

TL;DR This error might happen if you initiate too many new connections to multiprocessing.Manager() objects in parallel due to hard-coded backlog limit (16 at the time of writing) in multiprocessing/managers.py:

        # do authentication later
        self.listener = Listener(address=address, backlog=16)
        self.address = self.listener.address

Details: I was starting a few hundreds subprocesses trying to get a value from multiprocessing.Manager().dict object at the very start of my program (basically instantly parallel). First few worked fine, but then they started to fail sporadically.

Interestingly, in my case, this only happened under VSCode debugger. I have found a mailing list discussion mentioning this issue more than 10 years ago. Looking at the source code of multiprocessing I found out that the backlog limit is still hard-coded (seems to get increased from 5 to 16 in modern versions). I increased it to 64 and all errors were gone.

So if the pending connections queue reaches the limit, all new connections will be refused. Especially when you run your code under debugger, connections are getting served a tick slower and the backlog buffer may get full when hundreds of them are flowing fast in parallel.

Programmierus
  • 167
  • 11