3

I have written some code to perform some calculations in parallel (joblib) and update a dictionary with the calculation results. The code consists of a main function which calls a generator function and calculation function to be run in parallel. The calculation result (a key:value pair) are added by each instance of the calculation function to a dictionary created in the main function and market as global.

Below is a simplified version of my code, illustrating the procedure described above.

When everything runs, the result dictionary (d_result) is empty, but it should have been populated with the results generated by the calculation function. Why is it so?

import numpy as np
from joblib import Parallel, delayed


def do_calc(d, r, pair_index): # function to be run in parallel

    data_1 = d[str(r)][pair_index, 1]
    data_2 = d[str(r)][pair_index, 2]
    result_name = str(data_1) + " ^ " + str(data_2)
    result = data_1 ** data_2
    d_result[result_name] = result
    # d_result.setdefault(result_name, []).append(result)  ## same result as above


def compute_indices(d): # generator function

    for r in d:
        num_pairs = d[str(r)].shape[0]
        for pair_index in range(num_pairs):
            yield r, pair_index


def process(): # main function

    global d_result
    d_result = {}
    r1 = np.array([['ab', 1, 2], ['vw', 10, 12]], dtype=object)
    r2 = np.array([['ac', 1, 3], ['vx', 10, 13]], dtype=object)
    r3 = np.array([['ad', 1, 4], ['vy', 10, 14]], dtype=object)
    r4 = np.array([['ae', 1, 5], ['vz', 10, 15]], dtype=object)
    d = {'r1': r1, 'r2': r2, 'r3': r3, 'r4': r4}
    Parallel(n_jobs=4)(delayed(do_calc)(d, r, pair_index) for r, pair_index in (compute_indices)(d))
    print(d_result)


process()
Nukolas
  • 465
  • 1
  • 7
  • 17
  • I'm guessing here because I'm not familiar with joblib. But it seems you are creating new processes where the calculation will be done when you call Parallel(), but trying to print the value of `d_result` immediately. You probably need to wait until the parallel jobs have finished. – Paul Cornelius Apr 04 '17 at 08:07
  • @Paul Cornelius That shouldn't effect things - I can put a print statement in the `do_calc()` function and see that it prints for time the function is run prior to printing `d_result` in the main function. – Nukolas Apr 04 '17 at 22:08

2 Answers2

4

I am glad you got your program to work. However I think you have overlooked something important, and you might run into trouble if you use your example as a basis for larger programs.

I scanned the docs for joblib, and discovered that it's built on the Python multiprocessing module. So the multiprocessing programming guidelines apply.

At first I could not figure out why your new program ran successfully and the original one did not. Here is the reason (from the link above): "Bear in mind that if code run in a child process tries to access a global variable, then the value it sees (if any) may not be the same as the value in the parent process at the time that Process.start was called." This is because each child process has, at least conceptually, its own copy of the Python interpreter. In each child process, the code that is used by that process must be imported. If that code declares globals, the two processes will have separate copies of those globals, even though it doesn't look that way when you read the code. So when your original program's child process put data into the global d_result, it was actually a different object from d_result in the parent process. From the docs again: "Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).

For example, under Windows running the following module would fail with a RuntimeError:

from multiprocessing import Process

def foo():
    print 'hello'

p = Process(target=foo)
p.start()

Instead one should protect the entry point of the program by using if __name__ == '__main__'."

So it is important to add one line of code to your program (the second version), right before the last line:

if __name__ == "__main__":
    process()

Failure to do this can result in some nasty bugs that you don't want to spend time with.

Paul Cornelius
  • 9,245
  • 1
  • 15
  • 24
2

OK, I've figured it out. Answer and new code below:

The do_calc() function now generates an empty dict, then populates it with a single key:value pair and returns the dict.

The parallel bit in process() by default creates a list of that which is returned from do_calc(). So what I end up with after the parallelised do_calc() is a list of dicts.

What I really want is a single dict, so using dict comprehension I convert the list of dicts to dict, and wala, she's all good!

This helped: python convert list of single key dictionaries into a single dictionary

import numpy as np
from joblib import Parallel, delayed


def do_calc(d, r, pair_index):  # calculation function to be run in parallel

    data_1 = d[str(r)][pair_index, 1]
    data_2 = d[str(r)][pair_index, 2]
    result_name = str(data_1) + " ^ " + str(data_2)
    result = data_1 ** data_2
    d_result = {}  # create empty dict
    d_result[result_name] = result  #add key:value pair to dict
    return d_result  # return dict


def compute_indices(d):  # generator function

    for r in d:
        num_pairs = d[str(r)].shape[0]
        for pair_index in range(num_pairs):
            yield r, pair_index


def process():  # main function

    r1 = np.array([['ab', 1, 2], ['vw', 10, 12]], dtype=object)
    r2 = np.array([['ac', 1, 3], ['vx', 10, 13]], dtype=object)
    r3 = np.array([['ad', 1, 4], ['vy', 10, 14]], dtype=object)
    r4 = np.array([['ae', 1, 5], ['vz', 10, 15]], dtype=object)
    d = {'r1': r1, 'r2': r2, 'r3': r3, 'r4': r4}
    # parallelised calc.  Each run returns dict, final output is list of dicts
    d_result = Parallel(n_jobs=4)(delayed(do_calc)(d, r, pair_index) for r, pair_index in (compute_indices)(d))
    # transform list of dicts to dict
    d_result = {k: v for x in d_result for k, v in x.items()}
    print(d_result)

process()
Community
  • 1
  • 1
Nukolas
  • 465
  • 1
  • 7
  • 17
  • 5
    This solution does not directly answer the question "how do I update a global dictionary within a parallelised loop", rather, it is a workaround which achieves the same intended end result for this specific case. – Nukolas Apr 06 '17 at 00:22
  • better alternative : `return result_name, result` and `d_result = {k: v for (k, v) in d_result}` – abdelgha4 Mar 30 '22 at 19:42