1

I have a program that is trying to predict email conversion for every email I send in a week (so, usually 7 sends). The output is 7 different files with the prediction scores for each customer. Running these serially can take close to 8 hours, so I have tried to parallelize them with multiprocessing. This speeds things up very well, but I've noticed that after a process finishes it seems to hold onto its memory, until there is none left and one of the processes gets killed by the system without completing its task.

I've based the following code off of the 'manual pool' example in this answer, as I need to limit the number of processes that start at once due to memory constraints. What I would like is that as one process finishes, it releases its memory to the system, freeing up space for the next worker.

Below is the code that handles concurrency:

def work_controller(in_queue, out_list):
    while True:
        key = in_queue.get()
        print key

        if key == None:
            return

        work_loop(key)
        out_list.append(key)

if __name__ == '__main__':

    num_workers = 4
    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)
    processes = []

    for i in xrange(num_workers):
        p = Process(target=work_controller, args=(work,results))
        processes.append(p)
        p.start()

    iters = itertools.chain([key for key in training_dict.keys()])
    for item in iters:
        work.put(item)

    for p in processes:
        print "Joining Worker"
        p.join()

Here is the actual work code, if that is of any help:

def work_loop(key):
    with open('email_training_dict.pkl','rb') as f:
        training_dict = pickle.load(f)
    df_test = pd.DataFrame.from_csv(test_file)
    outdict = {}
    target = 'is_convert'

    df_train = train_dataframe(key)
    features = data_cleanse(df_train,df_test)

    # MAIN PREDICTION
    print 'Start time: {}'.format(datetime.datetime.now()) + '\n'

    # train/test by mailer
    X_train = df_train[features]
    X_test = df_test[features]
    y_train = df_train[target]

    # run model fit
    clf = imbalance.ImbalanceClassifier()

    clf = clf.fit(X_train, y_train)
    y_hat = clf.predict(X_test)

    outdict[key] = clf.y_vote
    print outdict[key]
    print 'Time Complete: {}'.format(datetime.datetime.now()) + '\n'
    with open(output_file,'wb') as f:
        pickle.dump(outdict,f)
Community
  • 1
  • 1
jpavs
  • 648
  • 5
  • 17
  • How sure are you that the memory leak is due to multiprocessing? Do you still see an increase in memory consumption if you call your `work_loop` function serially? – ali_m Jun 10 '15 at 21:46
  • @ali_m Thanks for your comment. You are correct there is some kind of memory leak. Interestingly (or not?), the program won't chew up a new block of memory for each iteration - it only takes more as needed. So, for instance, if one iteration of `work_loop` takes 1gb of memory, and the next takes 500mb, the total the program is using is 1gb. But if the next iteration takes 2gb, then the program keeps that entire 2gb. I'm still searching for a way to release this back to the system – jpavs Jun 17 '15 at 22:29
  • 1
    That actually sounds like normal behaviour to me. When a Python object goes out of scope (or if you delete it manually) the memory it was allocated won't be released until the object is garbage collected. Exactly *when* this happens is rather unpredictable, but you can force garbage collection using `gc.collect()`. However, even after the object has been garbage collected the memory that's just been freed may not be reclaimed by the OS, so if you're watching the memory usage of your Python process you should not expect it to go down immediately. – ali_m Jun 17 '15 at 22:56
  • If the memory usage of your Python process increased by some increment every time you ran `work_loop()`, then I would suspect that you had a memory leak in `work_loop`. – ali_m Jun 17 '15 at 22:59
  • At any rate I think you're unlikely to get a very satisfactory answer to your question as it stands, since it's not possible to reproduce the problem without the rest of your code and the input data. You'd have a much better chance if you could trim down your code into an [MCVE](http://stackoverflow.com/help/mcve) (there's a good chance that you'll be able to identify the cause yourself during the process). – ali_m Jun 18 '15 at 23:30

1 Answers1

1

I'm assuming, that like the example you linked you are using the Queue.Queue() as your queue object. This is a blocking queue, which means a call to queue.get() will return an element, or wait/block until it can return an element. Try changing your work_controller function to the below:

def work_controller(in_queue, out_list):
  while True: # when the queue is empty return
      try:
          key = in_queue.get(False) # add False to not have the queue block
      except Queue.Empty:
          return
      print key

      work_loop(key)
      out_list.append(key)

While the above solves the blocking issue it gives rise to another. At the start of the threads' life, there are no items in the in_queue, thus the threads will immediately end.

To solve this I suggest you do add a flag to indicate if it is okay to terminate.

global ok_to_end # put this flag in a global space

def work_controller(in_queue, out_list):
  while True: # when the queue is empty return
      try:
          key = in_queue.get(False) # add False to not have the queue block
      except Queue.Empty:
          if ok_to_end: # consult the flag before ending.
              return
      print key

      work_loop(key)
      out_list.append(key)

if __name__ == '__main__':

    num_workers = 4
    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)
    processes = []

    ok_to_end = False # termination flag
    for i in xrange(num_workers):
        p = Process(target=work_controller, args=(work,results))
        processes.append(p)
        p.start()

    iters = itertools.chain([key for key in training_dict.keys()])
    for item in iters:
        work.put(item)

    ok_to_end = True # termination flag set to True after queue is filled

    for p in processes:
        print "Joining Worker"
        p.join()
steve
  • 2,488
  • 5
  • 26
  • 39