69

I have a very large (read only) array of data that I want to be processed by multiple processes in parallel.

I like the Pool.map function and would like to use it to calculate functions on that data in parallel.

I saw that one can use the Value or Array class to use shared memory data between processes. But when I try to use this I get a RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance when using the Pool.map function:

Here is a simplified example of what I am trying to do:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

Can anyone tell me what I am doing wrong here?

So what I would like to do is pass info about a newly created shared memory allocated array to the processes after they have been created in the process pool.

betontalpfa
  • 3,454
  • 1
  • 33
  • 65
Jeroen Dirks
  • 7,705
  • 12
  • 50
  • 70
  • 1
    Unfortunately that's not possible. The recommended way according to mp documentation is to use inheritence (on fork platforms). For read only data as you have here one would normally use a global, but can used a shared Array for read/write communication. Forking is cheap so you can recreate the Pool whenever you receive the data, then close it afterwards. Unfortunately, on Windows this isn't possible - the workaround is to use a shared memory Array (even in the read only case) but this can only be passed to subprocesses at process creation (I imagine they need to be added to the access list... – robince Nov 13 '09 at 10:24
  • for the shared memory segment and that this logic isn't implemented except at subprocess startup). You can pass the shared data array at Pool start up as I showed, or to a Process in a similar way. You can't pass a shared memory Array to an open Pool - you have to create the Pool after the memory. Easy ways around this include allocating a maximum size buffer, or just allocating the array when you know the required size before starting the Pool. If you keep your global variables down Pool shouldn't be too expensive on windows either - global variables are automatically ... – robince Nov 13 '09 at 10:27
  • pickled and sent to the subprocesses - which is why I my suggestion to make one buffer of sufficient size at the start (where hopefully your amount of global variables is small), then Pool, is better. I took the time to understand and solve your problem in good faith - before you edited your question - so while I understand if you want to let it run, I hope at the end you will consider accepting my answer if nothing substantially different/better comes along. – robince Nov 13 '09 at 10:30
  • I had a closer look at the source code and the information about the shared memory can be pickled (needed to get info about it over to the client process on windows) but that code has an assert to only run during process spawning. I wonder why that is. – Jeroen Dirks Nov 13 '09 at 15:32

4 Answers4

61

Trying again as I just saw the bounty ;)

Basically I think the error message means what it said - multiprocessing shared memory Arrays can't be passed as arguments (by pickling). It doesn't make sense to serialise the data - the point is the data is shared memory. So you have to make the shared array global. I think it's neater to put it as the attribute of a module, as in my first answer, but just leaving it as a global variable in your example also works well. Taking on board your point of not wanting to set the data before the fork, here is a modified example. If you wanted to have more than one possible shared array (and that's why you wanted to pass toShare as an argument) you could similarly make a global list of shared arrays, and just pass the index to count_it (which would become for c in toShare[i]:).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[EDIT: The above doesn't work on windows because of not using fork. However, the below does work on Windows, still using Pool, so I think this is the closest to what you want:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

Not sure why map won't Pickle the array but Process and Pool will - I think perhaps it has be transferred at the point of the subprocess initialization on windows. Note that the data is still set after the fork though.

robince
  • 10,826
  • 3
  • 35
  • 48
  • Even on platforms with fork you can not insert new shared data into toShare after the fork since each process will have its own independent copy at that point. – Jeroen Dirks Nov 12 '09 at 14:10
  • So the real problem seems to be that how we can pickle the information about an Array so it can be send and connected to from the other process. – Jeroen Dirks Nov 12 '09 at 14:41
  • @James - no that's not right. The array has to be set up before the fork, but then it is shared memory that can be changed, with changes visible across all children. Look at the example - I put the data into the array *after* the fork (which occure when Pool() is instantiated). That data could be obtained at run time, after the fork, and as long as it fits into the preallocated shared memory segment it can be copied there and seen from all children. – robince Nov 12 '09 at 15:49
  • You can pickle the Array, but not using Pool. – jwilson Nov 12 '09 at 16:21
  • Editted to add working Windows version, using only Pool (by passing the shared array as an initiliazation parameter. – robince Nov 12 '09 at 16:26
  • You are getting closer but there is still the issue that the toShare array length has to be fixed before the pool is created. So you are still creating the shared memory segment before the processes are created. What I really want to see as a general solution is a way to create a new variable length shared array after the pool is created, pass info about it to the worker process and have it read from it. – Jeroen Dirks Nov 12 '09 at 18:39
  • I'm afraid that isn't possible with Pool. You have to create the shared memory beforehand. – robince Nov 12 '09 at 18:48
  • In any case it seems an artificial requirement. If the new set of data is the wrong size for the current shared buffer - you can just close the pool (`pool.close()`), create a new shared array of the required size and open a new pool. For any computational tasks where using multiprocessing is worth it the overhead of closing and opening the pool will be tiny. And the Pool operations are relatively atomic - so it is not like you could inject fresh data in the middle of a map command. – robince Nov 12 '09 at 19:19
  • The assert on pickling the shared data array seems to be an artificial constraint on using the shared resource with multi-processing but given that constraint you have provided some reasonable workarounds so I will give you the points for accepted answer. – Jeroen Dirks Nov 13 '09 at 20:05
9

If you're seeing:

RuntimeError: Synchronized objects should only be shared between processes through inheritance

Consider using multiprocessing.Manager as it doesn't have this limitation. The manager works considering it presumably runs in a separate process altogether.

import ctypes
import multiprocessing

# Put this in a method or function, otherwise it will run on import from each module:
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock()  # pylint: disable=no-member

with counter_lock:
    counter.value = count = counter.value + 1
Asclepius
  • 57,944
  • 17
  • 167
  • 143
  • this was the only suggestion I actually got working when using a `multiprocessing.Pool` ... and I did not need the explicit treatment of `manager.Lock` – raphael Mar 25 '20 at 08:15
  • @raphael Are you asserting that the Value has an implicit lock? The explicit lock is there to prevent a race condition, and thereby prevent erroneous counts when updating the count from multiple processes. – Asclepius Oct 30 '20 at 13:26
7

If the data is read only just make it a variable in a module before the fork from Pool. Then all the child processes should be able to access it, and it won't be copied provided you don't write to it.

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

If you do want to try to use Array though you could try with the lock=False keyword argument (it is true by default).

robince
  • 10,826
  • 3
  • 35
  • 48
  • I do not believe the use of globals is safe and would certainly not work on windows where the processes are not forked. – Jeroen Dirks Nov 04 '09 at 20:41
  • 1
    How is it not safe? If you only need read access to the data it is fine. If you write to it by mistake, then the modified page will be copied-on-write for the child process so nothing bad will happen (wouldn't interfere with other processes for example). You're right it won't work on windows though... – robince Nov 04 '09 at 20:59
  • You are right that it is safe on fork based platforms. But I would like to know if there is a shared memory based way to share large amounts of data after the process pool is created. – Jeroen Dirks Nov 04 '09 at 21:00
7

The problem I see is that Pool doesn't support pickling shared data through its argument list. That's what the error message means by "objects should only be shared between processes through inheritance". The shared data needs to be inherited, i.e., global if you want to share it using the Pool class.

If you need to pass them explicitly, you may have to use multiprocessing.Process. Here is your reworked example:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

Output: ('s', 9) ('a', 2) ('b', 3) ('d', 12)

The ordering of elements of the queue may vary.

To make this more generic and similar to Pool, you could create a fixed N number of Processes, split the list of keys into N pieces, and then use a wrapper function as the Process target, which will call count_it for each key in the list it is passed, like:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)
jwilson
  • 184
  • 2