This is basically a duplicate of: Keras + Tensorflow and Multiprocessing in Python But my setup is a bit different, and their solution doesn't work for me.
I need to train a keras model against predictions made from another model. The predictions are connected to some CPU heavy code, so I would like to parallelize them and have the code run in worker processes. Here is the code I would like to execute:
import numpy as np
from keras.layers import Input, Dense
from keras.models import Model
from keras.optimizers import Adam
def create_model():
input_layer = Input((10,))
dense = Dense(10)(input_layer)
return Model(inputs=input_layer, outputs=dense)
model_outside = create_model()
model_outside.compile(Adam(1e-3), "mse")
def subprocess_routine(weights):
model_inside = create_model()
model_inside.set_weights(weights)
while True:
# lots of CPU
batch = np.random.rand(10, 10)
prediction = model_inside.predict(batch)
yield batch, prediction
weights = model_outside.get_weights()
model_outside.fit_generator(subprocess_routine(weights),
epochs=10,
steps_per_epoch=100,
use_multiprocessing=True,
workers=1)
This produces an error
E tensorflow/core/grappler/clusters/utils.cc:81] Failed to get device properties, error code: 3
I found the above question, the answer is to move keras imports into the subprocess. I have added all imports into the subprocess_routine
. But that doesn't change the error. It would probably be necessary to eliminate keras imports altogether from the main process, but in my setup, that would mean huge refactorings.
Keras + multithreading seems to work. In this issue, scroll down to the very last comment: https://github.com/keras-team/keras/issues/5640 In my code, it looks like this:
model_inside = create_model()
model_inside._make_predict_function()
graph = tf.get_default_graph()
def subprocess_routine(model_inside, graph):
while True:
batch = np.random.rand(10, 10)
with graph.as_default():
prediction = model_inside.predict(batch)
yield batch, prediction
model_outside.fit_generator(subprocess_routine(model_inside, graph),
epochs=10,
steps_per_epoch=100,
use_multiprocessing=True,
workers=1)
But the error message is identical.
Since the problem is apparently related to initialization of the subprocesses, I tried to create a new session in each subprocess:
def subprocess_routine(weights):
import keras.backend as K
import tensorflow as tf
sess = tf.Session()
K.set_session(sess)
model_inside = create_model()
model_inside.set_weights(weights)
while True:
batch = np.random.rand(10, 10)
prediction = model_inside.predict(batch)
yield batch, prediction
It produces a variation on the same error message:
E tensorflow/stream_executor/cuda/cuda_driver.cc:1300] could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED
So again, the initialization seems broken.
How can I run keras both in my main process and subprocesses spawned by multiprocessing ?