I'm trying to solve a binary-classification problem. I therefore created a model in Keras (with a TensorFlow backend) and trained the model on CPU. I saved the model in the TensorFlow SavedModel format using the Keras API. I'm using Kaggle Kernels with Python 3.7.6, Keras 2.4.3 and TensorFlow 2.3.0.
Here's the simplified code I used along with some mockup data (in reality I'm training my model on a GPU but I believe the issue I face is independet of that fact):
# setup
import numpy as np
random_state = 0
# create mockup data
train_labels = np.random.randint(2, size=(1000))
validation_labels = np.random.randint(2, size=(200))
train_features = np.random.rand(1000, 49)
validation_features = np.random.rand(200, 49)
# create and train model
from keras.models import Sequential
from keras.layers import Dense, Dropout
dim = train_features.shape[1]
model = Sequential()
model.add(Dense(dim, input_dim=dim, activation='relu'))
model.add(Dropout(0.2, seed=random_state))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics='accuracy')
model.summary()
epochs = 10
batch_size = 100
model.fit(train_features, train_labels, epochs=epochs, batch_size=batch_size, verbose=1, validation_data=(validation_features, validation_labels))
# evaluate model
from sklearn.metrics import confusion_matrix
prediction_labels = model.predict_classes(validation_features)
print(confusion_matrix(prediction_labels, validation_labels))
# save model
model.save('/kaggle/working/model')
I then load the model in a new session. When I make a prediction on a single CPU it works just fine. What I actually want to do, however, is making predictions on all 4 available CPUs in parallel (in contrast to my mockup here, the actual problem I'm trying to solve is much more complicated and much more data is involved). I tried to do this using multiprocessing
like so:
# setup
import numpy as np
import pandas as pd
random_state = 0
# load model
from keras.models import load_model
model = load_model('../input/repro-model/model')
# create mockup test data
test_features = np.random.rand(1000, 49)
# make some test predictions on the entire test set and a single observation
model.predict_classes(test_features)
model.predict_classes(test_features[[0]])
# set up test dataframe for making predictions in parallel
test_df = pd.DataFrame(test_features)
seq_ids = []
for i in np.arange(1,201):
seq_id = [i] * 5
seq_ids.append(seq_id)
frm_ids = [np.arange(1,6)] * 200
test_df['seq_id'] = [item for sublist in seq_ids for item in sublist]
test_df['frm_id'] = [item for sublist in frm_ids for item in sublist]
# test the setup
seq_id = 1
frm_id = 1
model.predict_classes(test_df[(test_df.seq_id == seq_id) & (test_df.frm_id == frm_id)].drop(['seq_id', 'frm_id'], axis=1))
# create function for making predictions
def make_prediction(model, data, seq_id, frm_id):
print(seq_id)
pred = model.predict_classes(data[(data.seq_id == seq_id) & (data.frm_id == frm_id)].drop(['seq_id', 'frm_id'], axis=1))
return pred
# make test prediction
make_prediction(model, test_df, 1, 1)
# make predictions in parallel
from multiprocessing import Pool
import itertools
workers = 4
p = Pool(processes=workers)
seq_list, frm_list = np.arange(1, 201), np.arange(1, 6)
id_pair_list = list(itertools.product(seq_list, frm_list))
predictions = p.starmap(make_prediction, [(model, test_df, id_pair[0], id_pair[1]) for id_pair in id_pair_list])
p.close()
When I run the code above I get the following stacktrace and error:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-85-52e9e388daba> in <module>
8 id_pair_list = list(itertools.product(seq_list, frm_list))
9
---> 10 predictions = p.starmap(make_prediction, [(model, test_df, id_pair[0], id_pair[1]) for id_pair in id_pair_list])
11 p.close()
/opt/conda/lib/python3.7/multiprocessing/pool.py in starmap(self, func, iterable, chunksize)
274 `func` and (a, b) becomes func(a, b).
275 '''
--> 276 return self._map_async(func, iterable, starmapstar, chunksize).get()
277
278 def starmap_async(self, func, iterable, chunksize=None, callback=None,
/opt/conda/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
655 return self._value
656 else:
--> 657 raise self._value
658
659 def _set(self, i, obj):
/opt/conda/lib/python3.7/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
429 break
430 try:
--> 431 put(task)
432 except Exception as e:
433 job, idx = task[:2]
/opt/conda/lib/python3.7/multiprocessing/connection.py in send(self, obj)
204 self._check_closed()
205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))
207
208 def recv_bytes(self, maxlength=None):
/opt/conda/lib/python3.7/multiprocessing/reduction.py in dumps(cls, obj, protocol)
49 def dumps(cls, obj, protocol=None):
50 buf = io.BytesIO()
---> 51 cls(buf, protocol).dump(obj)
52 return buf.getbuffer()
53
TypeError: can't pickle _thread.RLock objects
After reading everything on this topic I found on SO (e.g., here, here, here and here) and GitHub (e.g., here, and here and here) I figured that there is some issue with pickling my sequential Keras model and multithreading. I then tried to load the model within my prediction function and delete it after each predict function call like so:
def make_prediction(data, seq_id, frm_id):
print(seq_id)
from keras.models import load_model
model = load_model('../input/repro-model/model')
pred = model.predict_classes(data[(data.seq_id == seq_id) & (data.frm_id == frm_id)].drop(['seq_id', 'frm_id'], axis=1))
del model
return pred
from multiprocessing import Pool
import itertools
workers = 4
p = Pool(processes=workers)
seq_list, frm_list = np.arange(1, 201), np.arange(1, 6)
id_pair_list = list(itertools.product(seq_list, frm_list))
predictions = p.starmap(make_prediction, [(test_df, id_pair[0], id_pair[1]) for id_pair in id_pair_list])
p.close()
I don't get an error anymore but the process hangs when the starmap function call is executed when the first four predictions are made. So there must be some issue remaining. Using model._make_predict_function()
doesn't work as it seems to be deprecated. There are also quite a few posts on SO on this topic that are unanswered: here, here and here.
Does anyone have an idea how I could parallelize making predictions with my sequential Keras model on CPU? That would be awesome since I'm stuck with this issue for quite a while already. Thanks a lot!