0

I am trying to run a multiprocessing function and store its output in an array somehow, but can not seem to find a way to do so. This is what I have got so far:

resultsAr = []
inputData = [a,b,c,d]#THIS ARRAY CONTAINS DATA

def function(data):
    values = [some_Number_1, some_Number_2]
    resultsAr.append(values)        
    print  ('Values = ', values) #THIS WORKS - THE CORRECT VALUES ARE PRINTED
    print ('resultsAr = ', resultsAr) #WORKS AS WELL

def parallel_run(function, inputData): #a function that runs in multiple processors    
    cpu_no = 4
    if len(inputData) < cpu_no:
        cpu_no = len(inputData) 
    p = multiprocessing.Pool(cpu_no) 
    p.starmap_async(function, inputData, chunksize=1)
    p.close()
    p.join()
    print ('resultsAr = ', resultsAr) # THIS PRINTS OUT AN EMPTY ARRAY!

parallel_run(function, inputData)
Yair
  • 859
  • 2
  • 12
  • 27

2 Answers2

1

What's happening is that Python is spinning up child processes for you in your Pool, and then it copies the input data over to that process for you. The child process now has a copy of the original, your function is mutating that copy. That's why the mutations show up in your print statements in function. You aren't, though, mutating it back in the parent process, so when execution returns to that process, the original resultsAr hasn't been changed.

The canonical way to do what you're describing here is to just use a Pool.map call, like so:

resultsAr = []
inputData = [a,b,c,d]

def function(data):
    values = [some_Number_1, some_Number_2]
    return values

def parallel_run(function, inputData):
    cpu_no = 4
    if len(inputData) < cpu_no:
        cpu_no = len(inputData) 
    p = multiprocessing.Pool(cpu_no)
    # this needs to be declared global to mutate
    # the resultsAr defined in the enclosing scope
    global resultsAr
    resultsAr = p.map(function, inputData, chunksize=1)
    p.close()
    p.join()
    print ('resultsAr = ', resultsAr)

Also, the example you gave should throw an exception, as you're passing a parameter (the elements of inputData) to a function that takes no arguments (function). I'm assuming here that you actually want the parameter in function after all.

Haldean Brown
  • 12,411
  • 5
  • 43
  • 58
  • That works (at least partly) Thanks @Haldean Brown! BUT - How can I print / access those results now? It seems that resultsAr is not a list anymore but a `MapResult` (I had to use the `starmap_async` function instead of the `map` one. Not sure if that makes any difference). – Yair Jan 24 '17 at 00:06
  • What version of Python are you using, and how are you importing multiprocessing? Both the docs and my local Python install agree that `pool.map` returns an iterable (on my computer, it's a list) – Haldean Brown Jan 24 '17 at 00:21
  • I am using `pool.starmap_async` instead of `pool.map` (due to the nature of my function). I could not find any evidence about `starmap_async` being iterable or not. I am using python 3.5.1 with anaconda spyder 2.3.8 on a windows PC. import by `import multiprocessing`. – Yair Jan 24 '17 at 09:34
  • I have found the answer (as posted in the comments [here](http://stackoverflow.com/questions/26238691/counting-total-number-of-tasks-executed-in-a-multiprocessing-pool-during-executi)). I have to add a `get` expression. – Yair Jan 24 '17 at 10:56
0

Based on the answer above, I have found that pool.starmap_async retrieves an non-iterable instance. This has been resolved by adding a get expression to the code, like so:

resultsAr = []
inputData = [a,b,c,d]

def function(data):
    values = [some_Number_1, some_Number_2]
    return values

def parallel_run(function, inputData):
    cpu_no = 4
    if len(inputData) < cpu_no:
        cpu_no = len(inputData) 
    p = multiprocessing.Pool(cpu_no)

    global resultsAr
    resultsAr = p.starmap_async(function, inputData, chunksize=1) 
    real_result = resultsAr.get()

    p.close()
    p.join()
    print ('real_result = ', real_result)
Yair
  • 859
  • 2
  • 12
  • 27