33

I am new to dask and I found so nice to have a module that makes it easy to get parallelization. I am working on a project where I was able to parallelize in a single machine a loop as you can see here . However, I would like to move over to dask.distributed. I applied the following changes to the class above:

diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
index ce6a72b..89f8638 100644
--- a/mlchem/fingerprints/gaussian.py
+++ b/mlchem/fingerprints/gaussian.py
@@ -6,7 +6,7 @@ from sklearn.externals import joblib
 from .cutoff import Cosine
 from collections import OrderedDict
 import dask
-import dask.multiprocessing
+from dask.distributed import Client
 import time


@@ -141,13 +141,14 @@ class Gaussian(object):
         for image in images.items():
             computations.append(self.fingerprints_per_image(image))

+        client = Client()
         if self.scaler is None:
-            feature_space = dask.compute(*computations, scheduler='processes',
+            feature_space = dask.compute(*computations, scheduler='distributed',
                                          num_workers=self.cores)
             feature_space = OrderedDict(feature_space)
         else:
             stacked_features = dask.compute(*computations,
-                                            scheduler='processes',
+                                            scheduler='distributed',
                                             num_workers=self.cores)

             stacked_features = numpy.array(stacked_features)

Doing so generates this error:

 File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

I have tried different ways of adding if __name__ == '__main__': without any success. This can be reproduced by running this example. I would appreciate if anyone could help me to figure this out. I have no clue on how I should change my code to make it work.

Thanks.

Edit: The example is cu_training.py.

muammar
  • 951
  • 2
  • 13
  • 32

2 Answers2

32

The Client command starts up new processes, so it will have to be within the if __name__ == '__main__': block as described in this SO question or this GitHub issue

This is the same as with the multiprocessing module

MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • Thanks, @MRocklin. I had read the links you have sent in your answer. However, I have not found a way yet to change my code to make this work. – muammar Mar 10 '19 at 03:04
  • I finally understood you, @MRocklin. I fixed it here https://github.com/muammar/mlchem/commit/c4ab997ccab30c9aa3a5943cf4b205f9e04885e4 I will try to refactor my code because I don't like very much that I have to do a function for running the calculations, but maybe this is just the intended way on using distributed. Not sure yet. Great tool by the way. – muammar Mar 10 '19 at 05:52
  • The answer you point to starts with "On Windows". Why am I running into this on linux? I don't when I try to do multiprocessing via the concurrent.futures module... – jammertheprogrammer Jul 12 '22 at 22:35
7

I faced several issues in my code even after including main if __name__ == '__main__': .

I am using several python files and modules and multiprocess is used by only one function for some sampling operation. The only fix that worked for me is to include main at the very first file & first line of the whole code (even including imports). The following worked well:

if __name__ == '__main__':
    from mjrl.utils.gym_env import GymEnv
    from mjrl.policies.gaussian_mlp import MLP
    from mjrl.baselines.quadratic_baseline import QuadraticBaseline
    from mjrl.baselines.mlp_baseline import MLPBaseline
    from mjrl.algos.npg_cg import NPG
    from mjrl.algos.dapg import DAPG
    from mjrl.algos.behavior_cloning import BC
    from mjrl.utils.train_agent import train_agent
    from mjrl.samplers.core import sample_paths
    import os
    import json
    import mjrl.envs
    import mj_envs
    import time as timer
    import pickle
    import argparse

    import numpy as np 

    # ===============================================================================
    # Get command line arguments
    # ===============================================================================

    parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
    parser.add_argument('--output', type=str, required=True, help='location to store results')
    parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
    args = parser.parse_args()
    JOB_DIR = args.output
    if not os.path.exists(JOB_DIR):
        os.mkdir(JOB_DIR)
    with open(args.config, 'r') as f:
        job_data = eval(f.read())
    assert 'algorithm' in job_data.keys()
    assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
    job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
    job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
    EXP_FILE = JOB_DIR + '/job_config.json'
    with open(EXP_FILE, 'w') as f:
        json.dump(job_data, f, indent=4)

    # ===============================================================================
    # Train Loop
    # ===============================================================================

    e = GymEnv(job_data['env'])
    policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
    baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
                           epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])
devil in the detail
  • 2,905
  • 17
  • 15