0

Background

I have an application that generates a string of words and is evaluated by a keras model. Each string is processed, evaluated by the NN, and updated according to the model. These strings are updated thousands of times. I have to do this for hundreds of strings.

What I want to do

I would like to parallelize this by running each string process in its own process (given enough cores). The issue is that I can't easily pass a model to each process (and even if I could I would probably run out of memory). Nor can I easily load the model in each process.

What I am thinking to do

Is there a way for me to run each string operation as a separate process, but have it call out to a process that has the model loaded and can accept data and will spit out a result (like an api call) so that the string process can continue processing. I have seen others use keras in a REST scenario, but this is more of a webservice. What would be the best way to communicate between the string generation process and the neural network process? Is it better to use something like a queue on this process and queue up inputs? or maybe asyncio?

EDIT

OKAY: It looks like we can spawn processes and pass messages to a single instance of a keras model (WITHOUT having to pass a keras object nor call the libraries within the process). We can keep checking if the dictionaries are empty or filled with none (yes not the most elegant).

import multiprocessing as mp
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import random
import os
def gen_model():
    # Define Sequential model with 3 layers
    model = keras.Sequential(
        [
            layers.Dense(2, activation="relu", name="layer1"),
            layers.Dense(2, activation="relu", name="layer2"),
            layers.Dense(1, name="layer3"),
        ]
    )
    return model
def worker(une, e):#preprocess input and update it!
    n = tf.expand_dims(tf.convert_to_tensor([random.randint(0,10) for x in range(2)]), axis=0)
    pid = os.getpid()
    e[pid] = None
    orig_val = e[pid]
    une[pid] = [n]
    while e[pid] == orig_val:#keep looping if they are the same
        pass#should exit once they are not the same
    print('process {} input is {} value is: {}'.format(pid,n, e[pid]))
    #further process and complete
    une[pid] = None
    

if __name__ == '__main__':
    n_proc = 2
    m = mp.Manager()
    unevaluated = m.dict()# list to store unevaluated stuff
    evaluated = m.dict()# same but after nn evals
    worker_pool = mp.Pool(n_proc, worker, (unevaluated, evaluated)) # n workers
    count = 0
    model = gen_model()
    while True:
        for k,v in unevaluated.items():
            if v == None:
                count+=1
                continue
            evaluated[k] = model.predict(v[0])
        if count >= n_proc:
            break

Description

Kevin
  • 3,077
  • 6
  • 31
  • 77
  • Where is the bottleneck here -- is string process much slower than NN process? Some more context here would be useful, too. Do you want to run all these tasks on the same machine, or would you like to be able to scale it out to run on multiple machines? – Joseph Redfern Nov 02 '20 at 19:01
  • @JosephRedfern yes the string processing is the bottleneck (I run a separate simulation on this string after my NN predicts) – Kevin Nov 02 '20 at 19:02
  • @JosephRedfern The environment is a compute cluster where I can request cores and I can request virtual machines, although there is overhead when requesting a new job and I have hundreds of strings so I would think to run it on the same machine. – Kevin Nov 02 '20 at 19:07

1 Answers1

1

The easiest way of doing this will probably be to use a queue, as you suggest.

One possible configuration:

Define three queues:

  • Input Strings: strings that are awaiting processing
  • Processed Strings: strings that have been processed, but not evaluated
  • Evaluated Strings: strings that have been processed and evaluated (i.e. the results).

Define N processes for the String Processing stuff (perhaps using a Pool).

Define one process for the NN stuff.

Your string processing method will take two queues as arguments -- input strings, and processed strings. It will pop an item from the input strings (blocking until one becomes available, using .get() on the queue), process it, and place the output in processed strings (using .put(), and repeat this indefinitely (or until the input queue is exhausted).

Your evaluation method will also take two queues as arguments -- the processed strings queue, and the evaluated strings queue, and will operate similarly to the strings processing method (but will call your Keras stuff instead).

You could then have another process doing something with the evaluated strings queue, or could instead wait for input strings queue to be empty and process it in the main thread.

Of course, there are many fine details that are glossed over here, but assuming that you're operating on a single machine and don't want to scale out to multiple processing nodes (with a task queue like Celery), this would be a simple way and will probably require the least modification to your existing code. Here is an example showing how it may work:

import multiprocessing
import multiprocessing.managers
import time
import random
import os


def string_stuff(input_queue, output_queue):
    # you should maybe use a signal here, or check remaining count in queue
    while True:
        item = input_queue.get()

        processed = item.split() # do the processing

        # simulate some long running thing
        time.sleep(random.random() * 2)

        output_queue.put(processed)
        print(f"[string_stuff:{os.getpid()}] Putting: {processed}")

def nn_stuff(input_queue, output_queue):
    while True:
        item = input_queue.get()

        evaluated = ':'.join(item)

        # simulate some less long running thing
        time.sleep(random.random()/2)

        output_queue.put(evaluated)
        print(f"[nn_stuff:{os.getpid()}] Putting: {evaluated}")


def generate_random_word(length):
    return ''.join([chr(ord('A') + random.randint(0, 25)) for _ in range(length)])

def fix_broken_multiprocessing():
    # this is a really horrible bug in Python that for some reason
    # has gone unfixed for years. Fix came from: https://stackoverflow.com/a/50878600/492759

    # Backup original AutoProxy function
    backup_autoproxy = multiprocessing.managers.AutoProxy

    # Defining a new AutoProxy that handles unwanted key argument 'manager_owned'
    def redefined_autoproxy(token, serializer, manager=None, authkey=None,
          exposed=None, incref=True, manager_owned=True):
    # Calling original AutoProxy without the unwanted key argument
        return backup_autoproxy(token, serializer, manager, authkey, exposed, incref)

    # Updating AutoProxy definition in multiprocessing.managers package
    multiprocessing.managers.AutoProxy = redefined_autoproxy



if __name__ == "__main__":

    fix_broken_multiprocessing()

    n_words = 1000
    words = [generate_random_word(random.randint(0, 10)) for _ in range(n_words)]

    m = multiprocessing.Manager()

    inputs = m.Queue()
    processed = m.Queue()
    evaluated = m.Queue()


    # populate our fake queue
    for word in words:
        inputs.put(word)
    

    string_pool = multiprocessing.Pool(4, string_stuff, (inputs, processed)) # 4 workers
    nn_process = multiprocessing.Process(target=nn_stuff, args=(processed, evaluated))
    nn_process.start()

    while True:
        result = evaluated.get()
        print(f"Just got: {result} back from the pipeline")
Joseph Redfern
  • 939
  • 1
  • 6
  • 13
  • I like your solution, but I am unsure why you cannot import keras from the top function, then call a models predict function. It seems that you have to instantiate a model within the process. Which is not exactly useful as we have to load the model (and the keras library) within the process to evaluate otherwise it hangs. – Kevin Nov 04 '20 at 23:12
  • @Kevin sure, you can import Keras in nn_stuff rather than importing it globally. You could even skip the Keras process altogether if that was beneficial and just do it in the main process. – Joseph Redfern Nov 04 '20 at 23:40