0

I have two threads, one handling the training, and the other one is handling the estimations. I have several entities and I would like to have a model for every entity, so I load and save models "on the fly" (I know this is quite slow).

If I load the model every time I want to call to the predict function, this works. However, if I just want to load the model once, and make several predicts in a row, I have the following exceptions:

Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 1182, in run
    self.function(*self.args, **self.kwargs)
  File "/home/arroyadr/Proyectos/iot-ai-engine/src/trainer.py", line 388, in train
    self.update_prediction_historics_all()
  File "/home/arroyadr/Proyectos/iot-ai-engine/src/trainer.py", line 413, in update_prediction_historics_all
    self.update_prediction_historics_dataset(new_dataset, loadModel=True)
  File "/home/arroyadr/Proyectos/iot-ai-engine/src/trainer.py", line 444, in update_prediction_historics_dataset
    loadModel=False)[0]
  File "/home/arroyadr/Proyectos/iot-ai-engine/src/estimator.py", line 207, in get_predictions_sequential
    prediction = model.predict(new_data)
  File "/home/arroyadr/.local/lib/python3.6/site-packages/keras/engine/training.py", line 1169, in predict
    steps=steps)
  File "/home/arroyadr/.local/lib/python3.6/site-packages/keras/engine/training_arrays.py", line 294, in predict_loop
    batch_outs = f(ins_batch)
  File "/home/arroyadr/.local/lib/python3.6/site-packages/keras/backend/tensorflow_backend.py", line 2715, in __call__
    return self._call(inputs)
  File "/home/arroyadr/.local/lib/python3.6/site-packages/keras/backend/tensorflow_backend.py", line 2671, in _call
    session)
  File "/home/arroyadr/.local/lib/python3.6/site-packages/keras/backend/tensorflow_backend.py", line 2623, in _make_callable
    callable_fn = session._make_callable_from_options(callable_opts)
  File "/home/arroyadr/.local/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1431, in _make_callable_from_options
    return BaseSession._Callable(self, callable_options)
  File "/home/arroyadr/.local/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1385, in __init__
    session._session, options_ptr, status)
  File "/home/arroyadr/.local/lib/python3.6/site-packages/tensorflow/python/framework/errors_impl.py", line 526, in __exit__
    c_api.TF_GetCode(self.status.status))
tensorflow.python.framework.errors_impl.InvalidArgumentError: Tensor lstm_1_input:0, specified in either feed_devices or fetch_devices was not found in the Graph

> Exception ignored in: <bound method BaseSession._Callable.__del__ of <tensorflow.python.client.session.BaseSession._Callable object at 0x7f1ca2b33748>>
Traceback (most recent call last):
  File "/home/arroyadr/.local/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1415, in __del__
    self._session._session, self._handle, status)
  File "/home/arroyadr/.local/lib/python3.6/site-packages/tensorflow/python/framework/errors_impl.py", line 526, in __exit__
    c_api.TF_GetCode(self.status.status))
tensorflow.python.framework.errors_impl.CancelledError: Session has been closed.

My code to load the model and predict is like the following:

def load_model_file(self, path=None):
    """
    Load the model given in path
    :param path: path of the model. If it is None it loads a default model
    :return:
    """
    lock = Lock()
    lock.acquire()
    model = None
    if (path is not None):
        if (os.path.isfile(path)):
            if (not sklearn):
                model = load_model(path)
                # model = self.pred.get_model([1, self.num_previous_measures, 1, 1], activation=self.activation)
                # model.load_weights(path)
                model._make_predict_function()
                self.graph = tf.get_default_graph()

            # Load scalers
            scalers = []
            for i in range(self.num_features_dataset):
                scaler = joblib.load(
                    '../rsc/datasets/scalers/' + path.split("/")[3].split(".")[0] + str(i) + '.pkl')
                scalers.append(scaler)

    lock.release()

    if (model is None):
        self.logger.error("No model could be found for " + str(path))
        self.model_predict = None
        self.scalers_predict = None
        return None, None
    else:
        self.model_predict = model
        self.scalers_predict = scalers
        return model, scalers


def get_predictions_sequential(self, data, num_pred, column_data, path=None, loadModel=True):
    """
    Predicts a list of values from the data given as param.
    :param data: data (time series) from which predict the next value
    :param num_pred: number of predictions
    :param path: path where to read the model
    :return: list of predictions
    """

    # Load model, if there is no model, then it will try to train and set the scaler
    if (loadModel):
        # with filelock.FileLock(path + ".lock"):
        model, scalers = self.load_model_file(path)
    else:
        model = self.model_predict
        scalers = self.scalers_predict
        # model._make_predict_function()

    # Scale prediction data
    data = np.reshape(np.array(data), (1, self.num_previous_measures, self.num_features_dataset))
    for i in range(self.num_features_dataset):
        data2 = data[:, :, i].copy().reshape(1, self.num_previous_measures)
        data2 = np.insert(data2, self.num_previous_measures, data.mean())
        data2 = np.reshape(data2, (1, self.num_previous_measures + 1))
        data2 = scalers[i].transform(data2)
        data[:, :, i] = data2[0][:-1]

    predictions = []
    new_data = data.copy()

    for i in range(num_pred):
        if (not sklearn):
            with self.graph.as_default():
            #     with tf.Session(graph=self.graph) as sess:
            # sess = tf.Session()
            # K.set_session(sess)
                prediction = model.predict(new_data)
            # self.logger.info("Pred not scaled: "+ str(prediction[0]))
            prediction_rescaled = self.invert_scale(scalers[column_data - 1],
                                                    new_data[0, :, column_data - 1],
                                                    prediction[0][0])

    return predictions

I have read and tracked this issue, but I do not manage to find any proper solution. Does anyone have had this problem?

Dominique
  • 16,450
  • 15
  • 56
  • 112

1 Answers1

0

After investigating and more trial/error, I have found a solution.

About the Session Cancelled Error, the answer 1 for this question could be useful:

K.clear_session() is useful when you're creating multiple models in succession, such as during hyperparameter search or cross-validation. Each model you train adds nodes (potentially numbering in the thousands) to the graph. TensorFlow executes the entire graph whenever you (or Keras) call tf.Session.run() or tf.Tensor.eval(), so your models will become slower and slower to train, and you may also run out of memory. Clearing the session removes all the nodes left over from previous models, freeing memory and preventing slowdown.

So what I did was to add the call K.clear_session() before loading the new model, this way we can avoid several models loading on the same graph.

On the other hand, I did NOT have to call

with self.graph.as_default():
   with tf.Session(graph=self.graph) as sess:
       backend.set_session(sess)

So loading the model function looks like this now:

def load_model_file(self, path=None):
    """
    Load the model given in path
    :param path: path of the model. If it is None it loads a default model
    :return:
    """
    model = None
    if (path is not None):
        if (os.path.isfile(path)):
            if (not sklearn):
                K.clear_session()
                model = load_model(path)
                # model = self.pred.get_model([1, self.num_previous_measures, 1, 1], activation=self.activation)
                # model.load_weights(path)
                model._make_predict_function()
                self.graph = tf.get_default_graph()
            else:
                model = joblib.load(path)
            # Load scalers
            scalers = []
            for i in range(self.num_features_dataset):
                scaler = joblib.load(
                    '../rsc/datasets/scalers/' + path.split("/")[3].split(".")[0] + str(i) + '.pkl')
                scalers.append(scaler)

    if (model is None):
        self.logger.error("No model could be found for " + str(path))
        self.model_predict = None
        self.scalers_predict = None
        return None, None
    else:
        self.model_predict = model
        self.scalers_predict = scalers
        return model, scalers

For multithreading problems and loading different models consequently please keep this in mind:

Whenever I call the function get_predictions_sequential from other classes or threads, instead, I should just use directly in function get_predictions_sequential:

with self.graph.as_default():
    prediction = model.predict(new_data)

And in this other classes try NOT to call any tf graph-related function or session-setting function, as you will be mixing the graph in one of the classes with the graph in the class with get_predictions_sequential function.

Cheers