2

I am trying to use parallel computing to train many XGBoost models. I am using joblib. I tried the code below and it doesn't work:

# SET UP
from joblib import Parallel, delayed
import numpy as np
import pandas as pd
import xgboost as xgb
# prepare data
X = pd.DataFrame(np.random.randint(0,100,size=(100, 4)),
                    index=range(100), columns=['f1', 'f2', 'f3', 'f4'])
y = pd.Series(np.random.randint(0,100,size=100))
data = xgb.DMatrix(X, y)
# prepare parameters to try out
param_nms = ['subsample', 'learning_rate', 'reg_alpha', 'reg_lambda', 
             'colsample_bytree', 'colsample_bylevel', 'gamma', 'min_child_weight']
params_ls = [{param_nm: f for param_nm, f in zip (param_nms, fs)}
              for fs in firefly]

# TRY 1: this doesn't work
def eval_params(params):
    return xgb.train(params, data)
Parallel(n_jobs=4)(delayed(eval_params)(params) for params in params_ls)

The error message is this:

---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Users\119433\Anaconda3\envs\py38\lib\site-packages\joblib\externals\loky\backend\queues.py", line 153, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "C:\Users\119433\Anaconda3\envs\py38\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 271, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "C:\Users\119433\Anaconda3\envs\py38\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 264, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "C:\Users\119433\Anaconda3\envs\py38\lib\site-packages\joblib\externals\cloudpickle\cloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
ValueError: ctypes objects containing pointers cannot be pickled
"""

The above exception was the direct cause of the following exception:

PicklingError                             Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_24472/3216539096.py in <module>
     18 def eval_params(params):
     19     return xgb.train(params, data)
---> 20 Parallel(n_jobs=4)(delayed(eval_params)(params) for params in params_ls)
     21 

~\Anaconda3\envs\py38\lib\site-packages\joblib\parallel.py in __call__(self, iterable)
   1052 
   1053             with self._backend.retrieval_context():
-> 1054                 self.retrieve()
   1055             # Make sure that we get a last message telling us we are done
   1056             elapsed_time = time.time() - self._start_time

~\Anaconda3\envs\py38\lib\site-packages\joblib\parallel.py in retrieve(self)
    931             try:
    932                 if getattr(self._backend, 'supports_timeout', False):
--> 933                     self._output.extend(job.get(timeout=self.timeout))
    934                 else:
    935                     self._output.extend(job.get())

~\Anaconda3\envs\py38\lib\site-packages\joblib\_parallel_backends.py in wrap_future_result(future, timeout)
    540         AsyncResults.get from multiprocessing."""
    541         try:
--> 542             return future.result(timeout=timeout)
    543         except CfTimeoutError as e:
    544             raise TimeoutError from e

~\Anaconda3\envs\py38\lib\concurrent\futures\_base.py in result(self, timeout)
    442                     raise CancelledError()
    443                 elif self._state == FINISHED:
--> 444                     return self.__get_result()
    445                 else:
    446                     raise TimeoutError()

~\Anaconda3\envs\py38\lib\concurrent\futures\_base.py in __get_result(self)
    387         if self._exception:
    388             try:
--> 389                 raise self._exception
    390             finally:
    391                 # Break a reference cycle with the exception in self._exception

PicklingError: Could not pickle the task to send it to the workers.

This thread suggests class-scope is not pickle able, and this post suggest using a different pacakge. But this example with sklearn's SVM model works just fine. So I tried to do the same thing with xgboost, and it works!

# TRY 2: this works!
def eval_model(X, y, params):
    model = xgb.XGBRegressor(**params)
    return model.fit(X, y)
Parallel(n_jobs=4)(delayed(train_model)(X, y, params) for params in params_ls)

So it doesn't seem to be class-scope problem. And I can't figure out where the problem lies. Why does TRY 1 fails and TRY 2 succeeds?

Flavia Giammarino
  • 7,987
  • 11
  • 30
  • 40
Sara
  • 245
  • 4
  • 11

1 Answers1

0

If you add the training data as an input of the eval_params function (as you did for the eval_model function) your code will work.

import numpy as np
import pandas as pd
import xgboost as xgb
from joblib import Parallel, delayed

X = pd.DataFrame(
    data=np.random.randint(0, 100, size=(100, 4)),
    index=range(100),
    columns=['f1', 'f2', 'f3', 'f4']
)

y = pd.Series(data=np.random.randint(0, 100, size=100))

params = [
    {'subsample': 0.1, 'learning_rate': 0.5},
    {'subsample': 0.2, 'learning_rate': 0.6},
    {'subsample': 0.3, 'learning_rate': 0.7},
    {'subsample': 0.4, 'learning_rate': 0.8},
]

def eval_params(X, y, params):
    data = xgb.DMatrix(X, y)
    return xgb.train(params, data)

out = Parallel(n_jobs=4)(delayed(eval_params)(X, y, params_) for params_ in params)

print(out)
# [<xgboost.core.Booster object at 0x7f8912e82f50>, <xgboost.core.Booster object at 0x7f8912e56a10>, <xgboost.core.Booster object at 0x7f8912e8cd50>, <xgboost.core.Booster object at 0x7f8912e56950>]
Flavia Giammarino
  • 7,987
  • 11
  • 30
  • 40
  • It does, but why thought? I want to avoid regenerating the DMatrix for each model set since they are all the same. – Sara Nov 10 '21 at 01:02