I have a list with a high number of different ML models model_trace
. If I do something like:
import gc
for model in model_trace:
model.predict(test_data)
del model
gc.collect()
The kernel gets killed at some point because the used RAM increases over time (it is no problem at all to use the predict function of the separate models). Here I read that there is no way to force Python to release all memory, so I tried using child processes with multiprocessing
:
from multiprocessing import Pool
from functools import partial
def predict_(model, test_inputs_=None):
return model.predict(test_inputs_)
worker = partial(
predict_,
test_inputs_=x_test,
)
pool = Pool(4)
preds = pool.map(worker, model_trace)
pool.close()
pool.join()
But this produces a Too many open files
error. For a small number of models, it works, while running a loop with small batches of models also produces the Too many open files
error.
The following works:
from multiprocessing import Pool
def predict_(test_inputs_):
return model.predict(test_inputs_)
for model in model_trace:
with Pool(1) as pool:
res = pool.apply(predict_, (x_test,))
However, I can't put this code inside a function run_predictions
, as this produces the following error:
AttributeError: Can't pickle local object 'run_predictions.<locals>.predict_'
Also as I use multiprocessing and the predictions could be done in parallel it woudnt feel natural to just run the predictions one after the other.
If I don't wrap model.predict(geometries_test)
inside the predict
function, I get get the Too many open files
error again and I also observe how the number of open files increases with every iteration. pool.terminate()
does not change this.
The models of the model trace I use are GaussianProcess models for which I use gpytorch. The following example model trace produces the described errors:
import copy
import torch
import gpytorch
import math
class GaussianProcessRegression(gpytorch.models.ExactGP):
def __init__(self, likelihood):
super(GaussianProcessRegression, self).__init__(None, None, likelihood)
self.likelihood = likelihood
self.mean_module = gpytorch.means.ZeroMean()
self.covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel())
def forward(self, inputs):
mean = self.mean_module(inputs)
covar = self.covar_module(inputs)
return gpytorch.distributions.MultivariateNormal(mean, covar)
def fit(self, train_inputs, train_targets):
train_inputs = (train_inputs,)
self.train_inputs = tuple(tri.unsqueeze(-1) if tri.ndimension() == 1 else tri for tri in train_inputs)
self.train_targets = train_targets
# train mode on
self.train()
self.likelihood.train()
def predict(self, test_inputs):
"""Returns the mean of the prediction."""
self.eval()
self.likelihood.eval()
with torch.no_grad(), gpytorch.settings.fast_pred_var():
observed_pred = self.likelihood(self(test_inputs))
return torch.flatten(observed_pred.mean)
def calc_model_trace(model, n_new_samples, n_total_samples, inputs, targets):
model_trace = []
for n in range(n_new_samples, n_total_samples, n_new_samples):
model.fit(inputs[:n], targets[:n])
model_trace.append(copy.deepcopy(model))
return model_trace
# data
x_train = torch.linspace(0, 1, 100)
y_train = torch.sin(x_train * (2 * math.pi)) + torch.randn(x_train.size()) * math.sqrt(0.04)
x_test = torch.linspace(0, 2, 201)
likelihood = gpytorch.likelihoods.GaussianLikelihood()
gpr_model = GaussianProcessRegression(likelihood)
model_trace = calc_model_trace(gpr_model, 5, 5000, x_train, y_train)