15

EDIT2: Github link below contains possible solutions to the problem of calling TF model from process. They include eager execution and dedicated server process, serving TF model predictions via http requests. I wonder if with custom server and requests I win any time compared to initializing global variables each time and calling tf.train.Server, but it seems to be more elegant way.

I will investigate memory leak, and if it is gone, close this question.

EDIT: Added simple reproducible example of the problem:

https://github.com/hcl14/Tensorflow-server-launched-from-child-process


Background: I am running Tensorflow server, and connecting to it from 'forked' processes. Creating (and destroying) processes dynamically is essential for me - I moved highly loaded part of code there because of a weird memory leak, not visible to Python profilers (threads do not solve the issue). Therefore, I want processes to be initialized fast and immediately start working. Memory is freed only when process is destroyed.

Doing experiments, I found a solution when loaded model and graph are saved into global variable, then taken by child process (which uses 'fork' mode by default) and then server is called.

Problem: The strange thing for me is that, after loading keras models, I cannot lock graph which I do not expect to modify, and I need to run tf.global_variables_initializer() each time I open new session in child process. However, dummy run in the main flow without any session creation works Ok. I know that in this case tensorflow uses default session, but all the variables on a graph should be initialized after model run, so I expected new session to work Ok with previously defined graph.

Thus, I think that modifying model makes Python to pickle a lot to the child process ('fork' mode), which creates computational and memory overhead.

Please, excuse me for a lot of code. The model I use is legacy and black box for me, so it is possible that my problem is related to it. Tensorflow version is 1.2 (I cannot upgrade it, model is not compatible), Python 3.6.5.

Also, maybe my solution is inefficient and there is better one, I would be grateful for your advice.

My setup is the following:

1.Tensorflow server started in main process:

Initialize the server:

def start_tf_server():
    import tensorflow as tf
    cluster = tf.train.ClusterSpec({"local": [tf_server_address]})
    server = tf.train.Server(cluster, job_name="local", task_index=0)    
    server.join() # block process from exiting

In main process:

p = multiprocessing.Process(target=start_tf_server)
p.daemon=True
p.start() # this process never ends, unless tf server crashes

# WARNING! Graph initialization must be made only after Tf server start!
# Otherwise everything will hang
# I suppose this is because of another session will be 
# created before the server one

# init model graph before branching processes
# share graph in the current process scope
interests = init_interests_for_process()
global_vars.multiprocess_globals["interests"] = interests

2.init_interests_for_process() is a model initializer, which loads my legacy model and shares it in the global variable. I do one dummy model pass to have everything initialized on the graph, and then want to lock the graph. But it is not working:

def init_interests_for_process():
    # Prevent errors on my GPU and disable tensorflow 
    # complaining about CPU instructions
    import os
    os.environ["CUDA_VISIBLE_DEVICES"]= ""
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

    import tensorflow as tf

    from tensorflow.contrib.keras import models

    # create tensorflow graph
    graph = tf.get_default_graph()

    with graph.as_default():

        TOKENIZER = joblib.load(TOKENIZER_FILE)

        NN1_MODEL = models.load_model(NN1_MODEL_FILE)

        with open(NN1_CATEGORY_NAMES_FILE, 'r') as f:
            NN1_CATEGORY_NAMES = f.read().splitlines()

        NN2_MODEL = models.load_model(NN2_MODEL_FILE)

        with open(NN2_CATEGORY_NAMES_FILE, 'r') as f:
            NN2_CATEGORY_NAMES = f.read().splitlines()
        # global variable with all the data to be shared
        interests = {}

        interests["TOKENIZER"] = TOKENIZER
        interests["NN1_MODEL"] = NN1_MODEL
        interests["NN1_CATEGORY_NAMES"] = NN1_CATEGORY_NAMES
        interests["NN2_MODEL"] = NN2_MODEL
        interests["NN2_CATEGORY_NAMES"] = NN2_CATEGORY_NAMES
        interests['all_category_names'] = NN1_CATEGORY_NAMES + \
                                          NN2_CATEGORY_NAMES
        # Reconstruct a Python object from a file persisted with joblib.dump.
        interests["INTEREST_SETTINGS"] = joblib.load(INTEREST_SETTINGS_FILE)

        # dummy run to create graph
        x = tf.contrib.keras.preprocessing.sequence.pad_sequences(
                         TOKENIZER.texts_to_sequences("Dummy srting"),
                         maxlen=interests["INTEREST_SETTINGS"]["INPUT_LENGTH"]
                         )
        y1 = NN1_MODEL.predict(x)
        y2 = NN2_MODEL.predict(x)

        # PROBLEM: I want, but cannot lock graph, as child process 
        # wants to run its own tf.global_variables_initializer()
        # graph.finalize()

        interests["GRAPH"] = graph

        return interests

3.Now I spawn the process (actually, the process is spawned from another process - hierarchy is complicated):

def foo(q):
     result = call_function_which_uses_interests_model(some_data) 
     q.put(result)
     return # I've read it is essential for destroying local variables
q = Queue()
p = Process(target=foo,args=(q,))
p.start()
p.join()
result = q.get() # retrieve data

4.And inside this process I am calling the model:

# retrieve model from global variable
interests = global_vars.multiprocess_globals["interests"]

tokenizer = interests["TOKENIZER"]
nn1_model = interests["NN1_MODEL"]
nn1_category_names = interests["NN1_CATEGORY_NAMES"]
nn2_model = interests["NN2_MODEL"]
nn2_category_names = interests["NN2_CATEGORY_NAMES"]
input_length = interests["INTEREST_SETTINGS"]["INPUT_LENGTH"]

# retrieve graph
graph = interests["GRAPH"]

# open session for server
logger.debug('Trying tf server at ' + 'grpc://'+tf_server_address)
sess = tf.Session('grpc://'+tf_server_address, graph=graph)

# PROBLEM: and I need to run variables initializer:
sess.run(tf.global_variables_initializer())


tf.contrib.keras.backend.set_session(sess)

# finally, make a call to server:
with sess.as_default():        
    x = tf.contrib.keras.preprocessing.sequence.pad_sequences(
                            tokenizer.texts_to_sequences(input_str),
                            maxlen=input_length)
    y1 = nn1_model.predict(x)
    y2 = nn2_model.predict(x)

Everything works Ok if I don't lock the graph and run variable initializer each time new process is spawned. (Except, there is a memory leak of about 30-90 MB for each call, not visible to python memory profilers). When I want to lock the graph, I get errors about uninitialized variables:

FailedPreconditionError (see above for traceback): 
Attempting to use uninitialized value gru_1/bias
       [[Node: gru_1/bias/read = Identity[T=DT_FLOAT, _class=["loc:@gru_1/bias"],
       _device="/job:local/replica:0/task:0/cpu:0"](gru_1/bias)]]

Thanks in advance!

Slowpoke
  • 1,069
  • 1
  • 13
  • 37
  • Have you tried to initialize the variables in a sessing and use the same session in the subprocess? Do the variables inside the process change during the execution? – Guilherme Uzeda Oct 22 '18 at 19:55
  • Why can't you call the initializers? – Matthieu Brucher Oct 23 '18 at 11:29
  • @MatthieuBrucher 1) I want to speed up execution, 2) it is strange that I need to call initializers and cannot just lock graph after dummy run which worked for threads 3) I have memory leak about 30-90 mb for each model run via spawned process, which I suppose might be because of that – Slowpoke Oct 23 '18 at 12:01
  • @GuilhermeUzeda Session is not fork-safe, in this case Tensorflow stucks because of some lock. Or you mean `subrocess` can deal with it? https://github.com/tensorflow/tensorflow/issues/2448 https://stackoverflow.com/questions/37874838/forking-a-python-process-after-loading-tensorflow https://stackoverflow.com/questions/36610290/tensorflow-and-multiprocessing-passing-sessions – Slowpoke Oct 23 '18 at 12:06
  • There should not be a memory leak due to initializers. How do you know you have this memory leak? is it caught by heaptrack, massif? Or is it top? If it's top, then you may not actually have a memory leak. – Matthieu Brucher Oct 23 '18 at 12:06
  • @MatthieuBrucher It's in top, and program crashes after RAM is fully consumed. – Slowpoke Oct 23 '18 at 12:07
  • So each python subprocess has a memory leak? Aren't they closed at some point? – Matthieu Brucher Oct 23 '18 at 12:09
  • I suppose you could avoid the global init if you provide proper values for all variables. I still think that you would get the "memory leak" even without the global init. – Matthieu Brucher Oct 23 '18 at 12:12
  • @MatthieuBrucher `join()` is called. Nevertheless, I suspect that doing initialization makes python process to copy changed model data in RAM and might create computational overhead ("fork" mode). The speed is satisfactory at the moment, but it will be good to have it faster. – Slowpoke Oct 23 '18 at 12:13
  • @MatthieuBrucher. The problem is that this particular model is black box for me. I can print variable names of course, but I am doing dummy run (`predict()`) which should initialize everything on graph, which is then passed to process. It worked for threads. What is wrong this time? – Slowpoke Oct 23 '18 at 12:19
  • @MatthieuBrucher This worked for threads https://stackoverflow.com/questions/46725323/keras-tensorflow-exception-while-predicting-from-multiple-threads/46757715#46757715 I essentially wanted to do the same trick for processes. – Slowpoke Oct 23 '18 at 12:23
  • 1
    Have you considered TensorFlow Serving? https://www.tensorflow.org/serving/ Generally you'd want to cache Sessions, which I believe is the strategy TF Serving uses. You could also go the other direction and `tf.enable_eager_execution()`, which eliminates the need for Sessions altogether. But if you really want to create and destroy Sessions, you could replace variables in the graph with constants (["freeze" it](https://www.tensorflow.org/extend/tool_developers/#freezing)). – Allen Lavoie Oct 25 '18 at 17:05
  • @AllenLavoie Thank you very much for your suggestion! I was able to create code for eager execution scenario on tensorflow 1.11. (github updated). But my legacy model needs tensorflow 1.2 and I'm not succeeded yet to load it with newer versions of tf to use eager execution. – Slowpoke Oct 27 '18 at 19:03
  • @AllenLavoie I have implemented custom server which serves model predictions via http requests (github updated). I will check it for memory leak before closing the question, but can you please expand your comment as answer, so I will have a good one to choose? – Slowpoke Oct 28 '18 at 15:40

2 Answers2

1

Have you considered TensorFlow Serving? https://www.tensorflow.org/serving/

Generally you'd want to cache Sessions, which I believe is the strategy TF Serving uses. That will be by far the best experience for deploying a TF model into a datacenter.

You could also go the other direction and tf.enable_eager_execution(), which eliminates the need for Sessions. Variables still get initialized, although it happens as soon as the Python variable objects are created.

But if you really want to create and destroy Sessions, you could replace variables in the graph with constants ("freeze" it). I'd also consider disabling graph optimizations in this case, as the first session.run call with a new set of feeds and fetches will by default spend some time optimizing the graph (configured through a RewriterConfig inside a GraphOptions proto).

(Expanded from a comment on the question)

Allen Lavoie
  • 5,778
  • 1
  • 17
  • 26
0

I am not sure if this could help you, but one needs to know that in tensorflow, variables are only initialized for a given Session. A variable needs to be initialized in each Session it is used -- even in the simpliest possible scenario:

import tensorflow as tf

x = tf.Variable(0.)

with tf.Session() as sess:
    tf.global_variables_initializer().run()
    # x is initialized -- no issue here
    x.eval()

with tf.Session() as sess:
    x.eval()
    # Error -- x was never initialized in this session, even though
    # it has been initialized before in another session
P-Gn
  • 23,115
  • 9
  • 87
  • 104
  • Please excuse me for the late response. After your answer I tried to understand why threaded example worked for me, and found that it seems that the same session was shared among threads, and tf.Session is [thread-safe](https://stackoverflow.com/questions/38694111). I've added example to github which shows that indeed for new session even with locked graph variables are uninitialized. However, I cannot use threads unfortunately. [This answer](https://stackoverflow.com/questions/46725323/) has brief mention of possible memory leaks with unfinalized graph, which might be what I experience now. – Slowpoke Oct 24 '18 at 15:41
  • Can there be solution with using frozen model somehow across processes? How can I initialize variables once and store their values as was suggested above? – Slowpoke Oct 24 '18 at 15:41