11

In a Nutshell

I want to change complex python objects concurrently, whereby each object is processed by a single process only. How can I do this (most efficiently)? Would implementing some kind of pickling support help? Would that be efficient?

Full Problem

I have a python data structure ArrayDict that basically consists of a numpy array and a dictionary and maps arbitrary indices to rows in the array. In my case, all keys are integers.

a = ArrayDict()

a[1234] = 12.5
a[10] = 3

print(a[1234])                               #12.5
print(a[10])                                 # 3.0

print(a[1234] == a.array[a.indexDict[1234]]) #true

Now I have multiple such ArrayDicts and want to fill them in myMethod(arrayDict, params). Since myMethod is expensive, I want to run it in parallel. Note that myMethod may add many rows to arrayDict. Every process alters its own ArrayDict. I do not need concurrent access to the ArrayDicts.

In myMethod, I change entries in the arrayDict (that is, I change the internal numpy array), I add entries to the arrayDict (that is, I add another index to the dictionary and write a new value in the internal array). Eventually, I would like to be able to exchange arrayDict's internal numpy array when it becomes too small. This does not happen often and I could perform this action in the non-parallel part of my program, if no better solution exists. My own attempts were not successful even without the array exchange.

I have spent days researching on shared memory and python's multiprocessing module. Since I will finally be working on linux, the task seemed to be rather simple: the system call fork() allows to work with copies of the arguments efficiently. My thought was then to change each ArrayDict in its own process, return the changed version of the object, and overwrite the original object. To save memory and save the work for copying, I used in addition sharedmem arrays to store the data in ArrayDict. I am aware that the dictionary must still be copied.

from sharedmem import sharedmem
import numpy as np

n = ...                   # length of the data array
myData = np.empty(n, dtype=object)
myData[:] = [ArrayDict() for _ in range(n)]
done = False

while not done:
    consideredData = ...  # numpy boolean array of length
                          # n with True at the index of
                          # considered data
    args = ...            # numpy array containing arguments
                          # for myMethod

    with sharedmem.MapReduce() as pool:
        results = pool.map(myMethod, 
                           list(zip(myData[considered], 
                                    args[considered])),
                           star=True)
        myData[considered] = results

    done = ...             # depends on what happens in
                           # myMethod

What I get is a segmentation fault error. I was able to circumvent this error by creating deepcopies of the ArrayDicts to myMethod and saving them into myData. I do not really understand why this is necessary, and copying my (potentially very large) arrays frequently (the while loop takes long) is not what seems to be efficient to me. However, at least it worked to a certain extent. Nevertheless, my program has some buggy behaviour at the 3rd iteration due to the shared memory. Therefore, I think that my way is not optimal.

I read here and here that it is possible to save aribtrary numpy arrays on the shared memory using multiprocessing.Array. However, I would still need to share the whole ArrayDict, which includes in particular a dictionary, which in turn is not pickable.

How could I achieve my goals in an efficient way? Would it be possible (and efficient) to make my object pickable somehow?

All solutions must run with python 3 and full numpy/scipy support on 64bit Linux.

Edit

I found here that it is somehow possible to share arbitrary objects using Multiprocessing "Manager" classes and user-defined proxy classes. Will this be efficient? I would like to exploit that I do not need concurrent access to the objects, even though they are not handled in the main process. Would it be an option to create a manager for each object that I want to process? (I might still have some misconceptions about how mangers work.)

Community
  • 1
  • 1
Samufi
  • 2,465
  • 3
  • 19
  • 43
  • How do you modify or use the arrayDict in myMethod? (I assume you mean `myMethod` not `myFunc`?) – gauteh Dec 19 '16 at 17:03
  • @gauteh: Thanks for making me aware of the typo. I corrected it. I also added a description of how I modify arrayDict in myMethod. – Samufi Dec 19 '16 at 17:16
  • Is it crucial that ArrayDict can take arbitrary key types? Otherwise the class might be restructured to use types that can be shared easily across processes without using a manager. As it is now, using a manager seems to be the best choice since the problem is somewhat complex. The performance loss might not be significant. – gauteh Dec 19 '16 at 17:18
  • @gauteh: I do only work with integer keys and do not require a solution that works with arbitrary keys. However, I would still be interested on how a solution would look like, if arbitrary keys had to be supported. `ArrayDict` does of course have more object variables than I listed: e.g. a set of deleted indices. However, if I have a solution for the dict, I hope to be able to do the rest myself. – Samufi Dec 19 '16 at 17:32

1 Answers1

8

This seems like a fairly complex class, and I am not able to completely anticipate if this solution will work in your case. A simple compromise for such a complex class is to use ProcessPoolExecutor.

If this does not answer your question then it would be good with a minimal, working, example.

from concurrent.futures import ProcessPoolExecutor

import numpy as np

class ArrayDict ():
  keys = None
  vals = None

  def __init__ (self):
    self.keys = dict ()
    self.vals = np.random.rand (1000)

  def __str__ (self):
    return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean())

def myMethod (ad, args):
  print ("starting:", ad)


if __name__ == '__main__':
  l     = [ArrayDict() for _ in range (5)]
  args  = [2, 3, 4, 1, 3]

  with ProcessPoolExecutor (max_workers = 2) as ex:

    d = ex.map (myMethod, l, args)

The objects are cloned when sent to the child process, you need to return the result (as changes to the object will not propagate back to the main process) and handle how you want to store them.

Note that changes to class variables will propagate to other objects in the same process, e.g. if you have more tasks than processes, changes to class variables will be shared among the instances running in the same process. This is usually undesired behavior.

This is a high-level interface to parallelization. ProcessPoolExecutor uses the multiprocessing module and can only be used with pickable objects. I suspect that ProcessPoolExecutor has performance similar to "sharing state between processes". Under the hood, ProcessPoolExecutor is using multiprocessing.Process, and should exhibit similar performance as Pool (except when using very long iterables with map). ProcessPoolExecutor does seem to be the intended future API for concurrent tasks in python.

If you can, it is usually faster to use the ThreadPoolExecutor (which can just be swapped for the ProcessPoolExecutor). In this case the object is shared between the processes, and an update to one will propagate back to the main thread.

As mentioned the fastest option is probably to re-structure ArrayDict so that it only uses objects that can be represented by multiprocessing.Value or Array.

If ProcessPoolExecutor does not work, and you cannot optimize ArrayDict, you may be stuck with using a Manager. There are good examples on how to do that here.

The greatest performance gain is often likely to be found in myMethod. And, as I mentioned, the overhead of using threads is less than that of processes.

Community
  • 1
  • 1
gauteh
  • 16,435
  • 4
  • 30
  • 34
  • Thanks for the answer! This works well and lets me even exchange the internal array of ArrayDict. Could you add some comments what is going on under the hood? What is the difference to multiprocessing.pool? Are the variables copied? I observed that integer properties of ArrayDict were not changed concurrently, but the entries of the dict were. How can I understand and predict that behaviour? Furthermore, I saw that your solution works in simple cases under Windows and Linux, but with the full ArrayDict only on Linux. Why? Is the overhead created in the "with" statement or in the "map" method? – Samufi Dec 21 '16 at 14:41
  • The `with` statement does not create any extra overhead. What do you mean not changed concurrently? It would be useful with an example here. This might be related to immutable and mutables types. The answer was updated somewhat on the performance. – gauteh Dec 21 '16 at 17:28
  • 1
    The variables are copied (but the memory is shared untill it is changed). You have to return the object from the worker process and replace the original object. Otherwise, use a manager. – gauteh Dec 21 '16 at 17:48
  • Thanks for the update! With "not changed concurrently" I mean that if the variable is changed in process A, this change does not affect the value of this variable in in process B. That is, what happens in process A does not affect process B. If changes in mutable objects are visiblie in all processes, this allows to implement shared memory very easily. However, in this case the programmer should also ensure that no dirty read and write actions occur. Therefore, it would be great to understand concurrent.futures' behaviour here. – Samufi Dec 21 '16 at 17:54
  • I do not think this can be happening, unless you explicitly use a system for shared memory. Note that if you have fewer processes than tasks, updates to a class variable will be visible to the tasks that are run on this process. This is probably undesired behavior anyway! If you use threads, the object is the same between the threads. – gauteh Dec 21 '16 at 17:57
  • You are right, in my case this was undesired behaviour. Nevertheless, it *did* happen! However, it only works, if the object is changed in the main process. Therefore, it does not allow for real shared memory. Anyway, I think it is important to know about that issue. – Samufi Dec 21 '16 at 18:57