1

Note that I have to sweep through more argument sets than available CPUs, so I'm not sure if Python will automatically schedule the use of the CPUs depending on their availability or what.

Here is what I tried, but I get an error about the arguments:

import random
import multiprocessing
from train_nodes import run
import itertools

envs = ["AntBulletEnv-v0", "HalfCheetahBulletEnv-vo", "HopperBulletEnv-v0", "ReacherBulletEnv-v0",
        "Walker2DBulletEnv-v0", "InvertedDoublePendulumBulletEnv-v0"]
algs = ["PPO", "A2C"]
seeds = [random.randint(0, 200), random.randint(200, 400), random.randint(400, 600), random.randint(600, 800), random.randint(800, 1000)]

args = list(itertools.product(*[envs, algs, seeds]))

num_cpus = multiprocessing.cpu_count()

with multiprocessing.Pool(num_cpus) as processing_pool:
    processing_pool.map(run, args)

run takes in 3 arguments: env, alg, and seed. For some reason here it doesn't register all 3.

Sam Lerman
  • 301
  • 2
  • 8
  • Does this answer your question? https://stackoverflow.com/questions/5442910/how-to-use-multiprocessing-pool-map-with-multiple-arguments – jkr Jan 13 '21 at 02:31

1 Answers1

0

The function in multiprocessing.Pool.map expects one argument. One way to adapt your code is to write a small wrapper function that takes env, alg, and seed as one argument, separates them, and passes them to run.

Another option is to use multiprocessing.Pool.starmap, which allows multiple arguments to be passed to the function.

import random
import multiprocessing
import itertools

envs = [
    "AntBulletEnv-v0",
    "HalfCheetahBulletEnv-vo",
    "HopperBulletEnv-v0",
    "ReacherBulletEnv-v0",
    "Walker2DBulletEnv-v0",
    "InvertedDoublePendulumBulletEnv-v0",
]
algs = ["PPO", "A2C"]
seeds = [
    random.randint(0, 200),
    random.randint(200, 400),
    random.randint(400, 600),
    random.randint(600, 800),
    random.randint(800, 1000),
]

args = list(itertools.product(*[envs, algs, seeds]))

num_cpus = multiprocessing.cpu_count()

# sample implementation or `run`
def run(env, alg, seed):
    # do stuff
    return random.randint(0, 200)

def wrapper(env_alg_seed):
    env, alg, seed = env_alg_seed
    return run(env=env, alg=alg, seed=seed)

# use a wrapper
with multiprocessing.Pool(num_cpus) as processing_pool:
    # accumulate results in a dictionary
    results = processing_pool.map(wrapper, args)

# use starmap and call `run` directly
with multiprocessing.Pool(num_cpus) as processing_pool:
    results = processing_pool.starmap(run, args)
jkr
  • 17,119
  • 2
  • 42
  • 68
  • Thanks, I think this is the answer, but I'm getting this error on my server. Do you know why? – Sam Lerman Jan 13 '21 at 02:38
  • Retrying (Retry(total=236, connect=236, read=240, redirect=240, status=240)) after connection broken by 'NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused')': /auth.login – Sam Lerman Jan 13 '21 at 02:38
  • And on my local machine: – Sam Lerman Jan 13 '21 at 02:43
  • RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() ... The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable. – Sam Lerman Jan 13 '21 at 02:43
  • it sounds like the error is coming from your `run` function. the code i posted runs without issue. – jkr Jan 13 '21 at 14:47