0

I am using gensim word2vec to return the most similar text from a corpus matching the query text. For instance, here are some relevant lines of code that start things off:

model = gensim.models.KeyedVectors.load_word2vec_format('/users/myuser/method_approaches/google_news_requirements/GoogleNews-vectors-negative300.bin.gz', binary=True)
instance = WmdSimilarity(processed_set, model, num_best=10)

And then I have this very simple function which runs the instance when passed to the multiprocessor:

def get_most_similar_for_a_given_text(instance,text,output):
    i=instance[text]
    output.put(i)

And then I have a batch multiprocessing script

def get_most_similar_for_all_texts_in_set(processed_set, instance):
    output = mp.Queue()
    # Setup a list of processes that we want to run
    processes = [mp.Process(target=get_most_similar_for_a_given_text, args=(instance, text, output)) for text in processed_set]
    num_cores = mp.cpu_count()
    Scaling_factor_batch_jobs = 3
    number_of_jobs = len(processes)
    num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs
    num_of_batches = int(number_of_jobs // num_jobs_per_batch)+1 
    print('\n'+'Running batches now...')
    for i in tqdm.tqdm(range(num_of_batches)):
        # although the floor/ceilings look like things are getting double counted, for instance with ranges being 0:24,24:48,48.. etc.. this is not the case, for whatever reason it doesn't work like that
        if i<num_of_batches-1: # true for all but last one
            floor_job = int(i * num_jobs_per_batch) # int because otherwise it's a float and mp doesn't like that
            ceil_job  = int(floor_job + num_jobs_per_batch)
            # Run processes
            for p in processes[floor_job : ceil_job]:
                p.start()
            for p in processes[floor_job : ceil_job]:
                p.join()
            for p in mp.active_children():
                p.terminate()
            print(floor_job,ceil_job)
        else: # true on last job, which picks up the missing batches that were lost due to rounding in the num_of_batches/num_jobs_per_batch formulas
            floor_job = int(i * num_jobs_per_batch)
            # Run processes
            for p in processes[floor_job:]:
                p.start()
            for p in processes[floor_job:]:
                p.join()
            for p in mp.active_children():
                p.terminate()
            print(floor_job,len(processes))
    # Get process results from the output queue
    results = [output.get() for p in tqdm.tqdm(processes)]
    np.save('/users/josh.flori/method_approaches/numpy_files/wmd_results_list.npy', results)
    return results

What actually happens when I run this is that it runs batches 1:4 fine. Those batches account for texts 0:96 in processed_set, which is the text that I am looping through. But then it gets to the 5th batch, of texts 96:120, it appears to simply stop processing but does not fail or quit or crash or do anything. Visually, it looks like it is still running but it's not as my cpu activity goes back down and the progress bar stops moving.

I visually inspected texts 96:120 from processed_set and nothing looked weird. I then ran the get_most_similar_for_a_given_text function on those texts in isolation, outside of the multiprocessing function, and they completed just fine.

Anyway, to reiterate, it always happens at batch 5. Does anyone have any insight here? I am not very familiar with how multiprocessing works.

Thanks again

Josh Flori
  • 104
  • 11
  • Do your processes complete? Do you get past join() after the first set? – Hannu Mar 22 '18 at 12:49
  • Yes, the first 4 batches successfully join – Josh Flori Mar 22 '18 at 13:01
  • what is the point of your active_children and terminate loop? Could it be stuck there? If you join a child process, it will be removed from process table when the task is complete. Join takes already care of that. There should not be any subprocesses running at that time, so that part looks like it is obsolete anyway. – Hannu Mar 22 '18 at 13:12
  • I'm not sure what the point is. I adopted the batch optimization from this post without fully understanding that part https://stackoverflow.com/questions/11996632/multiprocessing-in-python-while-limiting-the-number-of-running-processes .... I removed the terminations and the process still hung up on the 5th batch, so that doesn't seem to be the issue – Josh Flori Mar 22 '18 at 13:42
  • Can you run it in the debugger and inspect where it is when nothing more happens? – Arndt Jonasson Mar 22 '18 at 14:35
  • I have never used the debugger. After reading up on it and messing around I cannot quite figure it out. I tried python -m pdb script.py and hitting 'n' to step my way through but it just gets stuck on one line and idk why, and I'm not sure which other methods to try. I'm looking what neeraj dixit mentioned now – Josh Flori Mar 22 '18 at 15:28

1 Answers1

0

This could be because you are using a queue. The process will be stuck trying to put in queue if the queue is full. Try testing with a very small processed_set and see if all the jobs complete. If it does you might want to use a pipe for large number of results.

  • Yes, it works with a processed_set size < about 96, which is about where the main script fails. I looked up pipe usage online... I tried it out but am getting "too many open processes" like this link https://stackoverflow.com/questions/47426410/too-many-open-processes-when-using-multiprocessing-how-close-earlier-processes?rq=1 and am not familiar enough to get much further beyond that right now – Josh Flori Mar 22 '18 at 16:44
  • looks like you are creating too many processes. You should have processes less than or equal to number of cpu's and create a pipe for each process. call the recv on pipe and save the result. I was working on a similar requirement and ended up creating [This library] (https://pypi.python.org/pypi/pybatch) might be useful to you – Neeraj Dixit Mar 23 '18 at 09:19
  • If you are getting too many open processes errors, I suggest redesigning your app to use multiprocessing.Pool or concurrent.futures.ProcessPoolExecutor. This appears to be a very typical case for a pool and would give you a performance boost as well. Now you submit a batch and wait until all tasks are completed. Pool would submit next task as soon as one worker becomes available. – Hannu Mar 23 '18 at 10:36