Background
I have an application that generates a string of words and is evaluated by a keras model. Each string is processed, evaluated by the NN, and updated according to the model. These strings are updated thousands of times. I have to do this for hundreds of strings.
What I want to do
I would like to parallelize this by running each string process in its own process (given enough cores). The issue is that I can't easily pass a model to each process (and even if I could I would probably run out of memory). Nor can I easily load the model in each process.
What I am thinking to do
Is there a way for me to run each string operation as a separate process, but have it call out to a process that has the model loaded and can accept data and will spit out a result (like an api call) so that the string process can continue processing. I have seen others use keras in a REST scenario, but this is more of a webservice. What would be the best way to communicate between the string generation process and the neural network process? Is it better to use something like a queue on this process and queue up inputs? or maybe asyncio?
EDIT
OKAY: It looks like we can spawn processes and pass messages to a single instance of a keras model (WITHOUT having to pass a keras object nor call the libraries within the process). We can keep checking if the dictionaries are empty or filled with none (yes not the most elegant).
import multiprocessing as mp
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import random
import os
def gen_model():
# Define Sequential model with 3 layers
model = keras.Sequential(
[
layers.Dense(2, activation="relu", name="layer1"),
layers.Dense(2, activation="relu", name="layer2"),
layers.Dense(1, name="layer3"),
]
)
return model
def worker(une, e):#preprocess input and update it!
n = tf.expand_dims(tf.convert_to_tensor([random.randint(0,10) for x in range(2)]), axis=0)
pid = os.getpid()
e[pid] = None
orig_val = e[pid]
une[pid] = [n]
while e[pid] == orig_val:#keep looping if they are the same
pass#should exit once they are not the same
print('process {} input is {} value is: {}'.format(pid,n, e[pid]))
#further process and complete
une[pid] = None
if __name__ == '__main__':
n_proc = 2
m = mp.Manager()
unevaluated = m.dict()# list to store unevaluated stuff
evaluated = m.dict()# same but after nn evals
worker_pool = mp.Pool(n_proc, worker, (unevaluated, evaluated)) # n workers
count = 0
model = gen_model()
while True:
for k,v in unevaluated.items():
if v == None:
count+=1
continue
evaluated[k] = model.predict(v[0])
if count >= n_proc:
break