1

I am developing an heuristic algorithm to find "good" solutions for a NP (hence CPU intensive) problem.

I am implementing my solution using Python (I agree it is not the best choice when speed is a concern, but so it is) and I am splitting the workload across many subprocesses, each one in charge to explore a branch of the space of possible solutions.

To improve performances I would like to share some information gathered during the execution of each subprocess among all subprocesses. The "obvious" way to gather such information is gathering them inside a dictionary whose keys are (frozen)sets of integers and values are lists (or sets) of integers. Hence the shared dictionary must both be readable and writable from each subprocess, but I can safely expect that reads will be far more frequent than writes because a subprocess will write to the shared dict only when it finds something "interesting" and will read the dict far more frequently to know if a certain solution has already been evaluated by other processes (to avoid exploring the same branch twice or more). I do not expect the dimension of such dictionary to exceed 10 MB.

At the moment I implemented the shared dict using an instance of multiprocessing.Manager() that takes care of handling concurrent accesses to the shared dictionary out of the box. However (according to what I have found) this way of sharing data is implemented using pipes between processes which are a lot slower than plain and simple shared memory (moreover the dictionary must be pickled before being sent through the pipe and unpickled when it is received).

So far my code looks like this:

# main.py
import multiprocessing as mp
import os 

def worker(a, b, c, shared_dict):
    while condition:
        # do things
        # sometimes reads from shared_dict to check if a candidate solution has already been evaluated by other process
        # if not, evaluate it and store it inside the shared_dict together with some related info
    return worker_result


def main():
    with mp.Manager() as manager:
        # setup params a, b, c, ...
        # ...

        shared_dict = manager.dict()
        n_processes = os.cpu_count()
        with mp.Pool(processes=n_processes) as pool:
            async_results = [pool.apply_async(worker, (a, b, c, shared_dict)) for _ in range(n_processes)]
            results = [res.get() for res in async_results]

     # gather the overall result from 'results' list

if __name__ == '__main__':
    main()

To avoid the overhead due to pipes I would like to use shared memory, but it doesn't seem that the Python standard library offers a straightforward way to handle a dictionary in shared memory. As far as I know the Python standard library offers helpers to store data in shared memory only for standard ctypes (with multiprocessing.Value and multiprocessing.Array) or gives you access to raw areas of shared memory.

I do not want to implement my own hash table in a raw area of shared memory since I am not an expert neither of hash tables nor of concurrent programming, instead I am wondering if there are other faster solutions to my needs that doesn't require to write everything from zero. For example, I have seen that the ray library allows to read data written in shared memory way faster than using pipes, however it seems that you cannot modify a dictionary once it has been serialized and written to a shared memory area.

Any help?

Sirion
  • 804
  • 1
  • 11
  • 33
  • Does this answer your question? https://stackoverflow.com/a/6832693/8534196 – Andrew Eckart Jun 06 '21 at 11:20
  • You may also want to check out https://github.com/luizalabs/shared-memory-dict. – Andrew Eckart Jun 06 '21 at 11:21
  • @AndrewEckart: the answer you linked does exactly the same as my snippet above. Also, I had already found the shared memory dict repository, if you look at the source code you will see that it pickles and unpickles your data each time you write/read from it, so I expect it to be quite slow. – Sirion Jun 06 '21 at 11:50
  • Implement a process that has only a simple dict. Then provide one pipe/queue for adding and one for retrieving elements with regards to that simple dict. Share those two pipes/queues with each of your processes. This way, only the elements have to be pickled. – Sebastian Jun 06 '21 at 12:57
  • @Sebastian: thank you for the hint. When I need to save a key-value pair in the dict it is clear to me that I just need to put the pair on a queue and the process that holds the dict will read and save the pair. However, it is not completely obvious to me how to retrieve a value given its key inside a subprocess: how do you suggest to send the key to the process that holds the dict? (Do I have to take some lock to prevent interference from other subprocesses?) – Sirion Jun 06 '21 at 20:57
  • For what it's worth, the BaseManager is generalized facility that supports your creating your own managed, sharable type *even across machine boundaries*. A server process is created that listens for requests over a socket on Unix or named pipes on Windows. I have always believed these to be akin to remote procedure calls. Granted, they are slow operations compared to updating something directly in shared memory. But managed objects could not possible be passed back and forth between address spaces and work; what is being passed in the `apply_async` call is a *proxy* object. (more). – Booboo Jun 10 '21 at 19:34
  • I am not sure then if the scheme proposed by @Sebastian will be appreciably faster. – Booboo Jun 10 '21 at 19:35

1 Answers1

1

Unfortunately shared memory in Ray must be immutable. Typically, it is recommended that you use actors for mutable state. (see here).

You can do a couple of tricks with actors. For example, you can store object references in your dict if the values are immutable. Then the dict itself won't be in shared memory, but all of its objects would be.

@ray.remote
class DictActor
  def __init__(self):
    self._dict = {}

  def put(self, key, value):
    self._dict[key] = ray.put(value)

  def get(self, key):
    return self._dict[key]

d = DictActor.remote()
ray.get(d.put.remote("a", np.zeros(100)))
ray.get(d.get.remote("a")) # This result is in shared memory. 
Alex
  • 1,388
  • 1
  • 10
  • 19