1

I have a program similar to the following:

import time
from multiprocessing import Pool

class a_system():
    def __init__(self,N):
        self.N = N
        self.L = [0 for _ in range(self.N)]
    def comp(self,n):
        self.L[n] = 1
        return self.L
    def reset(self):
        self.L = [0 for _ in range(self.N)]

def individual_sim(iter):
    global B, L, sys
    sys.reset()
    L[iter] = sys.comp(iter)
    B += sum(L[iter])
    time.sleep(1)
    return L, B

def simulate(N_mc):
    global B, L, sys
    L = [[] for _ in range(N_mc)]
    B = 0
    sys = a_system(N_mc)
    [*map(individual_sim, range(N_mc))]
    # with Pool() as P:
    #     P.map(individual_sim,range(N_mc))
    return L, B

if __name__=="__main__":
    start = time.time()
    L, B = simulate(N_mc=5)
    print(L)
    print(B)
    print("Time elapsed: ",time.time()-start)

Here I would like to parallelise the line [*map(individual_sim, range(N_mc))] with multiprocessing. However, replacing this line with

with Pool() as P:
     P.map(individual_sim,range(N_mc))

returns an empty list of lists.

If instead I use P.map_async, P.imap, or P.imap_unordered, I don't get an error, but the list and B are left empty.

How can I parallelise this code?

P.S. I have tried ThreadPool from multiprocessing.pool, but I would like to avoid that, because the class a_system, which is a bit more complicated that the one shown here, needs to have a different copy for each worker (I get an exit code 139 (interrupted by signal 11: SIGSEGV)).

P.S.2 I might try to use sharedctypes or Managers (?), but I'm not sure how they work, nor which one should I use (or a combination?).

P.S.3 I have also tried modifying individual_sim as

def individual_sim(iter,B,L,sys):
    sys.reset()
    L[iter] = sys.comp(iter)
    B += sum(L[iter])
    time.sleep(1)
    return L, B

and to use the following in simulation:

   from functools import partial
   part_individual_sim = partial(individual_sim, B=B, L=L, sys=sys)
   with Pool() as P:
        P.map(part_individual_sim,range(N_mc))

But I still get empty lists.

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
semola
  • 172
  • 8
  • Possible duplicate of [AttributeError: Can't pickle local object 'computation.. function1 using multiprocessing queue](https://stackoverflow.com/questions/52091113/attributeerror-cant-pickle-local-object-computation-function1-using-multipr) – Darkonaut May 04 '19 at 12:58
  • I think you missed the point. The problem is that it is a nested function in the first place and that will just not work. You need to move the function outside to the top-level. – Darkonaut May 04 '19 at 13:05
  • @Darkonaut I see, but the whole point of nesting the definition was to be able to use the `L`, `B`, and `sys` as defined at the beginning of `simulation()` without passing them to `individual_sim`. Does this mean that the only option I have is to pass a whole bunch of variables to `individual_sim`? – semola May 04 '19 at 13:09
  • 1
    You could make them globals but the cleaner way would be indeed to pass them as arguments. – Darkonaut May 04 '19 at 13:11
  • @Darkonaut I have tried removing the nesting, and I don't get the pickle error any longer, but the problem of empy lists persists. I have edited the question as well... – semola May 04 '19 at 13:18
  • 1
    would be interesting to see what you did to provoke a `SIGSEGV`! that shouldn't happen unless you poking around in something lowlevel – Sam Mason May 04 '19 at 14:17
  • @SamMason I have tried to reproduce the problem but couldn't. I'll post it if I manage to. But I can assure you I don't really do anything fancy/lowlevel... – semola May 04 '19 at 14:18

2 Answers2

3

It's not really clear to me what your business logic is here, but you cannot modify globals in your parent from within your child processes. Separate processes don't share their address space.

You could make L a Manager.List and B a Manager.Value to modify them from your worker processes, though. Manager-objects live in a separate server process and you can modify them with proxy objects. Further you would need to use a Manager.Lock while modifying these shared objects to prevent data corruption.

Here is a stripped-down example which should get you started:

import time
from multiprocessing import Pool, Manager


def individual_sim(mlist, mvalue, mlock, idx):
    # in your real computation, make sure to not hold the lock longer than
    # really needed (e.g. calculations without holding lock)
    with mlock:
        mlist[idx] += 10
        mvalue.value += sum(mlist)


def simulate(n_workers, n):

    with Manager() as m:
        mlist = m.list([i for i in range(n)])
        print(mlist)
        mvalue = m.Value('i', 0)
        mlock = m.Lock()

        iterable = [(mlist, mvalue, mlock, i) for i in range(n)]

        with Pool(processes=n_workers) as pool:
             pool.starmap(individual_sim, iterable)

        # convert to non-shared objects before terminating manager
        mlist = list(mlist)
        mvalue = mvalue.value

    return mlist, mvalue


if __name__=="__main__":

    N_WORKERS = 4
    N = 20

    start = time.perf_counter()
    L, B = simulate(N_WORKERS, N)
    print(L)
    print(B)
    print("Time elapsed: ",time.perf_counter() - start)

Example Output:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
5900
Time elapsed:  0.14064819699706277

Process finished with exit code 0

It would also be possible to use Pool's initializer-parameter to pass proxies upon worker initialization and register them as globals instead of sending them as regular arguments with the starmap-call.

A bit more about Manager usage (relevant: nested proxies) I've written up here.

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
1

the multiprocessing module is works by forking the master process (or executing more copies of the Python interpreter, especially under Windows).

therefore, you'll see global variables, but they won't be shared between processes --- unless you go to special measures like, e.g., explicitly sharing memory. you're better off passing the required state in as function parameters (or via the Pool's initializer and initargs) and passing results back via the return value.

this tends to limit your design choices a bit, especially if you need to pass a lot of state around (e.g. as data you want to fit)

it's a very light weight wrapper around pretty low level primitives, hence it's not as featureful as things like Dask but performance tends to be better, if you can live with the constraints

editing to include some demo code which assumes the N_mc variable in your question relates to you doing some Monte Carlo/randomised approximation. I start by pulling in some libraries:

from multiprocessing import Pool

from PIL import Image
import numpy as np

and define a worker function and code for its initialisation:

def initfn(path):
    # make sure worker processes don't share RNG state, see:
    #   https://github.com/numpy/numpy/issues/9650
    np.random.seed()

    global image
    with Image.open(path) as img:
        image = np.asarray(img.convert('L'))

def worker(i, nsamps):
    height, width = image.shape
    subset = image[
        np.random.randint(height, size=nsamps),
        np.random.randint(width, size=nsamps),
    ]
    return np.mean(subset)

def mc_mean(path, nsamples, niter):
    with Pool(initializer=initfn, initargs=(path,)) as pool:
        params = [(i, nsamples) for i in range(niter)]
        return pool.starmap(worker, params)

i.e. initfn reads a JPEG/PNG file into a numpy array, then worker just calculates the average value (i.e. brightness) for some random subset of pixels. Note that color images are loaded as 3d matrices, indexed by [row, col, channel] (channels are conventionally 0=red, 1=blue, 2=green). Also, we also explicitly call np.random.seed to make sure that our worker jobs don't get the same sequence of random values.

We can then run this and plot the output to make sure everything looks OK:

import scipy.stats as sps
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='ticks')

filename = 'an_image.jpeg'
result = mc_mean(filename, 1234, 56789)

# Histogram of our results
plt.hist(result, 201, density=True, alpha=0.5, edgecolor='none')

# also calculate/display expected distribution
with Image.open(filename) as img:
    arr = np.asarray(img.convert('L'))
    # approximate distribution of monte-carlo error 
    mcdist = sps.norm(np.mean(arr), np.std(arr) / np.sqrt(1234))

mn,mx = plt.xlim()
plt.xlim(mn, mx)

x = np.linspace(mn, mx, 201)
plt.plot(x, mcdist.pdf(x), '--', color='C1')
sns.despine()

which should give us something like:

MC distribution

obviously this will depend on the image used, this is from this JPEG.

Sam Mason
  • 15,216
  • 1
  • 41
  • 60
  • Thanks, I'll try passing everything as arguments. How does one use `initializer` and `initargs` instead? Does it mean that the args passed as `initargs` will have shared memory? Something like `Pool(processes =4,initargs=(L,B))`? The [docs](https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool) don't quite explain it... – semola May 04 '19 at 14:44
  • I have tried writing `individual_sim` so that it takes everything as input, but I still get empty lists (I've edited the question as well, adding a PS3). What am I doing wrong? – semola May 04 '19 at 15:14