0

I have a lengthy function called run below that contains a few instances of appending data.

from multiprocessing import Process

data = []

def run():
    global data
    ...
    data.append(trace)
    ...

if __name__ == '__main__':
    jobs = []

    gen_count = 0
    leaked_count = 0
    system_count = 0

    N = 100

    for i in range(N):
        p = Process(target=run)
        jobs.append(p)
        p.start()

However, using multiprocessing no data is appended. In addition, the function run returns several values that need to be added to gen_count, leaked_count, and system_count and I am not sure how to retrieve those values. I chose multiprocessing because running run in a for-loop is slow and each iteration was independent of the rest. I would like to incorporate GPU acceleration in this code later for anyone that has any ideas on that.

So my questions are the following:

  1. Should I even be using multiprocessing as opposed to threading?
  2. Why is trace not being appended to data?
  3. How can I retrieve the output of run within the multiprocessing block?

Edit:

from plotly.offline import init_notebook_mode
import plotly.graph_objs as go
import plotly as py
import time
import Cross_Section_Loading
from multiprocess import Process, Pool, Queue, Manager, cpu_count
from functools import partial
import numpy as np
init_notebook_mode(connected=True)

...

def particle_func(x, y, z):

    leaked = 0
    nu = 0

    # get initial direction
    theta = np.random.uniform(0, np.pi, 1)
    phi = np.random.uniform(0, 2 * np.pi, 1)

    # compute energy via rejection sampling
    expfiss = lambda e: 0.453 * np.exp(-1.036 * e / 1.0e6) * np.sinh(np.sqrt(2.29 * e / 1.0e6))

    min_eng = np.min(E)
    max_eng = np.max(E)
    max_prob = np.max(expfiss(E))

    rejected = 1
    while rejected:
        a = np.random.uniform(min_eng, max_eng, 1)
        b = np.random.uniform(0, max_prob, 1)
        rel_prob = expfiss(a)
        if b <= rel_prob:
            energy = a
            rejected = 0

    alive = 1

    # vector to keep track of positions
    xvec = np.ones(1) * x
    yvec = np.ones(1) * y
    zvec = np.ones(1) * z

    while alive:
        # Get real/new cross-sections for corresponding energy
        index = energy_lookup(E, energy)

        interacted = 0
        total_distance = 0
        # Interacted may still be alive (scattering)
        while interacted == 0:

            ###################################################
            # Determine starting location for sample distance using sigma_total
            material_start = material_type(x, y)

            if material_start == 1:
                sig_tot = sigma_total_fuel(ENRICHMENT_1)[index]
            elif material_start == 2:
                sig_tot = sigma_total_fuel(ENRICHMENT_2)[index]
            elif material_start == 3:
                sig_tot = sigma_total_fuel(ENRICHMENT_3)[index]
            else:
                sig_tot = sigma_total_mod[index]

            ###################################################

            if material_start == 1 or material_start == 2 or material_start == 3:  # if in fuel pin

                # Get distance to edge of fuel rod (from fuel)
                d = distance_to_edge(x, y, phi)

                # get sample distance to collision
                s = -np.log(1.0 - np.random.random(1)) / sig_tot

                # Incidence on interface (denoted by code "no-interface")
                if d != 'no-interface':

                    # Sample distance is greater than interface distance (does not account for material change)
                    # Must convert between 2D and 3D
                    if s * np.sin(theta) > d:
                        total_distance += d / np.sin(theta)

                    # Sample distance is correct and interaction occurs
                    else:
                        total_distance += s
                        interacted = 1

                # Statement may be redundant but idk how to handle return from distance_to_rod
                else:
                    total_distance += s
                    interacted = 1

            else:               # if in moderator
                # get distance to edge of fuel rod (from moderator)
                d = distance_to_edge(x, y, phi)

                # get distance to collision
                s = -np.log(1.0 - np.random.random(1)) / sig_tot

                # Incidence on interface (denoted by code "no-interface")
                if d != 'no-interface':

                    # Sample distance is greater than interface distance (does not account for material change)
                    # Must convert between 2D and 3D
                    if s * np.sin(theta) > d:
                        total_distance += d / np.sin(theta)  # <- Right conversion?

                    # Sample distance is correct and interaction occurs
                    else:
                        total_distance += s
                        interacted = 1

                # Statement may be redundant but idk how to handle return from distance_to_rod
                else:
                    total_distance += s
                    interacted = 1

            # move particle
            z += total_distance * np.cos(theta)
            y += total_distance * np.sin(theta) * np.sin(phi)
            x += total_distance * np.sin(theta) * np.cos(phi)

        # material_end = material_type(x, y)
        #
        # if material_start != material_end:
        #     print("Neutron has crossed material interface(s)")

        # Trace/Track particle movement
        xvec = np.append(xvec, x)
        yvec = np.append(yvec, y)
        zvec = np.append(zvec, z)

        ###################################################

        # Leakage
        if x > X_BOUNDARY or x < -X_BOUNDARY:
            # Still need implementation
            leaked = 1
            alive = 0

        if y > Y_BOUNDARY or y < -Y_BOUNDARY:
            # Still need implementation
            leaked = 1
            alive = 0

        if z > HEIGHT or z < 0:
            # Still need implementation
            leaked = 1
            alive = 0

        ###################################################

        # Determine Type of interaction based on energy and corresponding cross-sections
        # In fuel
        material = material_type(x, y)
        if material == 1:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_1)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_1)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_1)[index]
            nu_temp = nu_fuel(ENRICHMENT_1)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        elif material == 2:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_2)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_2)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_2)[index]
            nu_temp = nu_fuel(ENRICHMENT_2)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        if material == 3:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_3)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_3)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_3)[index]
            nu_temp = nu_fuel(ENRICHMENT_3)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        # In water
        else:
            mod_scat = sigma_scatter_mod[index]
            mod_tot = sigma_total_mod[index]

            # scatter or absorb
            if np.random.random(1) < mod_scat / mod_tot:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_mod * energy, energy, 1)

            else:
                # absorbed
                alive = 0

        ###################################################

    return xvec, yvec, zvec, nu, leaked


##################################################################


def run(data_test):

    ###################################################

    # Uniformly Dispersed Source (Cylinder)
    # x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
    # y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
    # z = np.random.uniform(-HEIGHT, HEIGHT, 1)

    # Uniformly Dispersed FUEL Source (Cylinder)
    rejected = 1
    while rejected:
        x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
        y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
        z = np.random.uniform(-HEIGHT, HEIGHT, 1)
        if material_type(x, y):
            rejected = 0

    ###################################################

    # Get normal particle info (trace)
    x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
    leaked_count = leaked
    gen_count = nu
    system_count = (1 + nu - leaked)

    # particle_trace = go.Scatter3d(
    #     x=x_vec,
    #     y=y_vec,
    #     z=z_vec,
    #     mode='lines',
    #     line=dict(color='rgb(173, 255, 47)')
    # )
    #
    # data_test.append(particle_trace)

    data_test.append((x_vec, y_vec, z_vec))

    ###################################################

    nu_vec = [nu]
    x_vecs = [x_vec]
    y_vecs = [y_vec]
    z_vecs = [z_vec]

    if nu > 0:
        print("{} neutrons generated for neutron {}".format(nu, i))

    else:
        print("No neutrons generated for neutron {}".format(i + 1))

    t = 0
    recent_nus = nu_vec
    while np.any(recent_nus) != 0:

        print(nu_vec[-t:])

        tracker = 0

        nu_vec_temp = []

        x_vecs_temp = []
        y_vecs_temp = []
        z_vecs_temp = []

        for a in range(len(nu_vec[-t:])):

            x = x_vecs[-(a + 1)][-1]
            y = y_vecs[-(a + 1)][-1]
            z = z_vecs[-(a + 1)][-1]

            for j in range(nu_vec[-(a + 1)]):
                x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
                leaked_count += leaked

                print("Particle {} starting coords:".format(j + 1), x_vec[0], y_vec[0], z_vec[0])
                print("Particle {} ending coords:".format(j + 1), x_vec[-1], y_vec[-1], z_vec[-1])
                print("Particle {} nu value".format(j + 1), nu)

                nu_vec_temp.append(nu)
                tracker += 1

                x_vecs_temp.append(x_vec)
                y_vecs_temp.append(y_vec)
                z_vecs_temp.append(z_vec)

                # time.sleep(1)

                # particle_trace = go.Scatter3d(
                #     x=x_vec,
                #     y=y_vec,
                #     z=z_vec,
                #     mode='lines',
                #     line=dict(color='rgb(255, 0, 0)')
                # )

                # data_test.append(particle_trace)
                data_test.append((x_vec, y_vec, z_vec))

            print()
            t = tracker

        nu_vec.extend(nu_vec_temp)
        x_vecs.extend(x_vecs_temp)
        y_vecs.extend(y_vecs_temp)
        z_vecs.extend(z_vecs_temp)

        recent_nus = nu_vec_temp

        print("Continuing fission:", (np.any(recent_nus) != 0))

    return leaked_count, gen_count, system_count

##################################################################

if __name__ == '__main__':
    jobs = []

    manager = Manager()
    list_ = manager.list()
    for _ in range(cpu_count() - 1):
        p = Process(target=run, args=(list_,))
        jobs.append(p)
        p.start()
        p.join()
    while True:  # stops main thread from completing execution
        time.sleep(5)  # wait 5 second before checking if processes are terminated
        if all([not x.is_alive() for x in jobs]):  # check if all processes terminated
            break  # breaks the loop

# print("\nTotal number of neutrons in system:", system_count)
# print("Total number of neutrons generated from {} neutron source: {}".format(N, gen_count))
# print("System Multiplication factor:", gen_count/N)
# print("Total number of leaked neutrons:", leaked_count)


layout = go.Layout(
    title='Monte Carlo Assembly',
    autosize=True,
    showlegend=False,
    height=1000,
    width=1000,
    scene=dict(zaxis=dict(range=[-1, HEIGHT + 1]),
               yaxis=dict(range=[-(Y_DIM * PITCH + 5), (Y_DIM * PITCH + 5)]),
               xaxis=dict(range=[-(X_DIM * PITCH + 5), (X_DIM * PITCH + 5)])
               ),
)

fig = go.Figure(data=data, layout=layout)
py.offline.plot(fig, filename='file.html')

Output & Error Message:

/Users/sterlingbutters/anaconda/bin/python "/Users/sterlingbutters/PycharmProjects/Monte Carlo Simulation/MC Plotly (Cylindrical Assembly) Reflector.py"
For 17 x 17 assembly, 9 x 9 is needed. Your shape: (9, 9)
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
[(array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294]))]
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable

Process finished with exit code 0
Sterling Butters
  • 1,024
  • 3
  • 20
  • 41
  • You need an object in shared memory (currently, every process gets its own copy of data structures), so you could use a [Manager](https://docs.python.org/2/library/multiprocessing.html#managers) among others e.g. a `Queue`. Threading almost certainly won't give a speed up due to the GIL. Spawning a single process, as you have, also won't speed things up. Either divide the wok beforehand and send chunks to different processes, or put the whole job to a [`Pool`](https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool) – roganjosh May 26 '17 at 20:03
  • how would you implement a Pool in the above? I made some attempts awhile back but was not successful – Sterling Butters May 26 '17 at 20:05
  • What would your recommendation be: a Queue or Pool? – Sterling Butters May 26 '17 at 20:06
  • Your example is too minimal I think to make a call. Try both. You aren't restricted to a Queue, you could use a `manager.list()` as I also linked. – roganjosh May 26 '17 at 20:07
  • 1
    Also, I missed the `for` loop which is bad. You're spawning 100 processes, which is almost certainly more than your number of cores. Set that at a reasonable value (maybe number of cores - 1) and divide the work accordingly – roganjosh May 26 '17 at 20:12
  • So how can I get as much concurrency as possible without overloading my system? – Sterling Butters May 26 '17 at 20:13
  • I'm afraid I'm not sure how I would "set" that value or divide the work accordingly? – Sterling Butters May 26 '17 at 20:16
  • 1
    Well, my crappy laptop has 4 theoretical cores, so if I had a huge list whose members I had to process in the same way, I'd chunk that list into 3, spawn 3 processes and send each of those processes their own, unique, chunk of that list. Then recombine the results. – roganjosh May 26 '17 at 20:18
  • Ahhh I see what you are saying, so I could use a for-loop for 100 runs under 3 processes and get 300 results? That makes more sense, but it may take me awhile to figure it all out – Sterling Butters May 26 '17 at 20:31

2 Answers2

2

Multiprocessing spawns a different Process with it's own global variables copies from current environment. All the changes in variable made in that process does not reflect in parent process. You need to share memory between the process and variables in shared memory can be exchanged.

You can use multiprocessing.Manager to create a shared object like list or dictionary, and manipulate that object.

Processes are assigned to different cores/thread of your processor. If you have a 4 core/8 thread system, spawn a maximum of 7 processes to maximize performance, any more than that some processes will interfere with other processes and can slow down/reduce the cpu time allotted to your os which can crash your system. It's always the cpu cores/cpu threads -1 processes for stable processing leaving atleast one core to os to handle other operations.

You can modify your code like this

from multiprocessing import Process, Manager
import time

def run(list_):
    list_.append(trace)

if __name__ == "__main__":
    jobs = []
    gen_count = 0
    leaked_count = 0
    system_count = 0

    with Manager() as manager:
        list_ = manager.list()
        for _ in range(multiprocessing.cpu_count()-1):
            p = Process(target=run,args=(list_))
            jobs.append(p)
            p.start()
        while True: #stops main thread from completing execution
            time.sleep(5) #wait 5 second before checking if processes are terminated
            if all([not x.is_alive() for x in jobs]): #check if all processes terminated
                break #breaks the loop 
Harwee
  • 1,601
  • 2
  • 21
  • 35
  • thank you for your answer, please see question edit, any ideas on what the problem might be? – Sterling Butters May 26 '17 at 20:59
  • @SterlingButters you need to wait till all the processes have completed the execution. I guess the problem is your main code completed execution before all the processes spawned. Refer this https://stackoverflow.com/questions/25455462/share-list-between-process-in-python-server for more information – Harwee May 26 '17 at 23:08
  • Please see the new error message, what do you think the problem is now? – Sterling Butters May 27 '17 at 01:04
  • I cannot understand what the problem is, are your arguments picklable? because that is the part where the error is – Harwee May 27 '17 at 01:42
  • There are no arguments for "run" other than list_ which was used for the manager. Running print(dill.pickles(list_)) gives True so my guess is yes. – Sterling Butters May 27 '17 at 01:50
  • is this helpful: https://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods – Sterling Butters May 27 '17 at 01:58
  • Perhaps I should be using pathos? – Sterling Butters May 27 '17 at 02:10
  • dill can pickle some datatypes which default pickle cannot,`dill.pickles(list_)` will always work because 'list_' is always picklabe. The problem might be with variable `trace`. `Manager` uses default pickle to serialize the objects. Try to serialize the `trace` with default pickle and if it fails, you can use dill to convert it to serialized string and append it to list and deserialize when needed – Harwee May 27 '17 at 02:21
  • That would make sense because the trace object is a Plotly graphing object. How would I go about doing as you suggested? – Sterling Butters May 27 '17 at 02:26
  • convert `list_.append(trace)` to `list_.append(dill.dumps(trace))`. When processing element of list_ use `dill.loads(element)`. If dill could serialize plotly object this should work. Have a look here https://github.com/plotly/plotly.py/issues/579 pickle cannot serialize plotly graph object. – Harwee May 27 '17 at 02:54
  • So I made the appropriate conversion and I cannot process any element of list_ because it returns empty with a close error (see question edit for new error) – Sterling Butters May 27 '17 at 03:04
  • I can't help you much if that is the case, the problem is with plotly object, you can try threading but that would not cause any increase in performance. You can start the same code multiple times but you cannot have them at the same place unless they are serializable. I don't think pathos will be of great help as it uses dill as serializing tool (i think) – Harwee May 27 '17 at 03:10
  • Well I do have another option and that is to append a tuple of arrays to list_ and process afterwards, however when I do that, my results are very weird. I think we determined plotly object will not work so I will edit my question to remove error messages and include most of the code. Thank you so much for the help btw – Sterling Butters May 27 '17 at 03:15
  • post a new question with the problem, and if you don't have a problem with converting plot to raw image you can append images as file objects. – Harwee May 27 '17 at 08:57
  • Here is new question: https://stackoverflow.com/questions/44219027/python-multiprocessing-performing-undesired-identical-processes – Sterling Butters May 27 '17 at 16:20
1

The way multiprocessing works, each subtask runs in its own memory space and gets its own copy of any global variables. A common way around this limitation to effectively have shared data is to use a multiprocessing.Manager to coordinate concurrent access to it and transparently prevent any problems that might cause.

Below is an example of doing that based on your sample code. It also uses a multiprocessing.Pool() which makes it easy to create a fixed-size collection of process objects that can each provide asynchronous results from each subtask (or wait until all of them are finished before retrieving them, as is being done here).

from functools import partial
import multiprocessing

def run(data, i):
    data.append('trace%d' % i)
    return 1, 2, 3  # values to add to gen_count, leaked_count, and system_count

if __name__ == '__main__':
    N = 10
    manager = multiprocessing.Manager()  # create SyncManager
    data = manager.list()  # create a shared list
    pool = multiprocessing.Pool()

    async_result = pool.map_async(partial(run, data), range(N))
    values = tuple(zip(*async_result.get()))
    gen_count = sum(values[0])
    leaked_count = sum(values[1])
    system_count = sum(values[2])

    print(data)
    print('Totals:  gen_count {}, leaked_count {}, system_count {}'.format(
            gen_count, leaked_count, system_count))

Output:

['trace0', 'trace1', 'trace2', 'trace4', 'trace3', 'trace5', 'trace8', 'trace6', 'trace7', 'trace9']
Totals:  gen_count 10, leaked_count 20, system_count 30
martineau
  • 119,623
  • 25
  • 170
  • 301
  • Im afraid this didnt work, perhaps for the same reason listed here (bug): https://github.com/jonathanslenders/ptpython/issues/193 The issue is only 20 days old – Sterling Butters May 27 '17 at 00:56
  • @SterlingButters: Sounds like ptpython has pickle problems—which is very bad for doing multiprocessing (which depends on it). The code in your link looks fine to me. – martineau May 27 '17 at 03:58
  • using your algorithm in my code exactly gives me an empty list for async_result... I have no idea why – Sterling Butters May 27 '17 at 04:49
  • @SterlingButters: You have to call `async_result.get()` to retrieve the list. – martineau May 27 '17 at 08:47
  • Using async_result.get() ultimately returns TypeError: 'NoneType' object is not callable; using async_result.get returns > – Sterling Butters May 27 '17 at 15:56
  • I'll say what I said earlier, it sounds like `multiprocessing` is broken in the python environment you're using. – martineau May 27 '17 at 16:04
  • hmmm interesting, an exact copy of your code works in my environment though, I'm not sure if that makes a difference – Sterling Butters May 27 '17 at 16:13
  • Are you using `pool.map_async()` or `imap_unordered()` shown in the github link...or something else? They (or their result) are used differently. – martineau May 27 '17 at 18:52