0

I am having difficulty implementing parallelisation for generating a list of arrays. In this case, each array is generated independently, and then appended to a list. Somehow multiprocessing.apply_asynch() is outputting an empty array when I feed it with complicated arguments.

More specifically, just to give the context, I am attempting implement a machine learning algorithm using parallelisation . The idea is the following: I have an 'system', and an 'agent' which performs actions on the system. To teach the agent (in this case a neural net) how to behave optimally (with respect to a certain reward scheme that I have omitted here), the agent needs to generate trajectories of the system by applying actions on it. From the obtained reward obtained upon performing the actions, the agent then learns what to do and what not to do. Note importantly that the possible actions in the code are referred to as integers with:

    possible_actions = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
    

So here I am attempting to generate many such trajectories using multiprocessing (sorry the code is not runnable here as it requires many other files, but I'm hoping somebody can spot the issue):

    from quantum_simulator_EC import system
    from reinforce_keras_EC import Agent
    import multiprocessing as mp


    s = system(1200, N=3)
    s.set_initial_state([0,0,1])  
    agent = Agent(alpha=0.0003,  gamma=0.95, n_actions=len( s.actions ))


    def get_result(result):
        global action_batch
        action_batch.append(result)

    def generate_trajectory(s, agent):

        sequence_of_actions = []

        for k in range( 5 ):

            net_input = s.generate_net_input_FULL(6)
            action = agent.choose_action( net_input )
    
            sequence_of_actions.append(action)
    

        return sequence_of_actions
    
    action_batch = []

    pool = mp.Pool(2)
    for i in range(0, batch_size):
        pool.apply_async(generate_trajectory, args=(s,agent), callback=get_result)
    pool.close()
    pool.join()

    print(action_batch)

The problem is the code returns an empty array []. Can somebody explain to me what the issue is? Are there restrictions on the kind of arguments that I can pass to apply_asynch? In this example I am passing my system 's' and my 'agent', both complicated objects. I am mentioning this because when I test my code with simple arguments like integers or matrices, instead of agent and system, it works fine. If there is no obvious reason why it's not working, if somebody has some tips to debug the code that would also be helpful.

Note that there is no problem if I do not use multiprocessing by replacing the last part by:

    action_batch = []

    for i in range(0, batch_size):
        get_result( generate_sequence(s,agent) )

    print(action_batch)

And in this case, the output here is as expected, a list of sequences of 5 actions:

    [[4, 2, 1, 1, 7], [8, 2, 2, 12, 1], [8, 1, 9, 11, 9], [7, 10, 6, 1, 0]]
Vim154
  • 23
  • 3
  • Are you trying to return a full finished process, or intermediate results? In the first case you can directly assign the return values from the `pool.apply_async` to a list and get the values using `result.get()`, in the second case I would advice using a queue. – Thymen Jan 22 '21 at 16:15
  • The full finished process. Each process outputs a list, which should then be appended to another list that will contain all the generated outputs. By assigning the return values from `pool.apply_async` to a list do you mean that i should define a list of results `results = []` and then append `apply_async` to it multiple times in the folowing way: `results.append( pool.apply_async(generate_trajectory, args=(s,agent) )`? – Vim154 Jan 23 '21 at 18:03
  • Yeah, basically that, I have written an answer to demonstrate the full process, since it also requires a `close` and `join` call, as explained [here](https://stackoverflow.com/a/55035444/10961342). – Thymen Jan 23 '21 at 19:32

1 Answers1

0

The final results can directly be appended to a list in the main process, no need to create a callback function. Then you can close and join the pool, and finally retrieve all the results using get.

See the following two examples, using apply_async and starmap_async, (see this post for the difference).

Solution apply

import multiprocessing as mp
import time


def func(s, agent):
    print(f"Working on task {agent}")
    time.sleep(0.1)  # some task
    return (s, s, s)


if __name__ == '__main__':
    agent = "My awesome agent"
    with mp.Pool(2) as pool:
        results = []
        for s in range(5):
            results.append(pool.apply_async(func, args=(s, agent)))
        pool.close()
        pool.join()

    print([result.get() for result in results])

Solution starmap

import multiprocessing as mp
import time


def func(s, agent):
    print(f"Working on task {agent}")
    time.sleep(0.1)  # some task
    return (s, s, s)


if __name__ == '__main__':
    agent = "My awesome agent"
    with mp.Pool(2) as pool:
        result = pool.starmap_async(func, [(s, agent) for s in range(5)])
        pool.close()
        pool.join()

    print(result.get())
Output
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
[(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
Thymen
  • 2,089
  • 1
  • 9
  • 13
  • So I have tried it both ways (with the s and agent the object instances from my code). But the code generates an error at `result.get()`, both for `apply_async` and for `starmap_async`. The error is: `TypeError: can't pickle weakref objects`. Do you have any idea what the problem could be? The only real difference between the examples you gave and my code is really that in my case s and agent are object instances that interact to generate the lists of integers, but my guess was that it should not better... – Vim154 Jan 23 '21 at 20:33
  • The final results are mutable, this is not allowed when passing it on. You can make them immutable before returning (`tuple(result)`). – Thymen Jan 23 '21 at 20:43
  • So in the case of your first example, I have added `results2 = tuple(results)`, and then I modified the print statement to `print( results2[0].get() )`. I still get the error `TypeError: can't pickle weakref objects` – Vim154 Jan 23 '21 at 21:02
  • I might have been unclear in the previous message, the return values from `generate_trajectory` should be casted to `tuple`. It is also possible that your agent class is not pickable, in that case it should be reconstructed in the function itself. It might then be easier to create a `Process` instead. – Thymen Jan 23 '21 at 21:18
  • I tried to make the function return a `tuple` as you suggested, but the same error message remains. I also attempted to use `mp.Process` together with `mp.Queue` but that does not work either. The problem is similar: it works as long as I feed `Process` with simple integer or matrix arguments, but it hangs (i.e. it runs indefinitely, with no error message) when I try to feed it my system and agent objects. – Vim154 Jan 25 '21 at 07:59
  • In that case we would require a (minimal) part of the agent that reproduces your issue. – Thymen Jan 25 '21 at 08:57