I am using gensim word2vec to return the most similar text from a corpus matching the query text. For instance, here are some relevant lines of code that start things off:
model = gensim.models.KeyedVectors.load_word2vec_format('/users/myuser/method_approaches/google_news_requirements/GoogleNews-vectors-negative300.bin.gz', binary=True)
instance = WmdSimilarity(processed_set, model, num_best=10)
And then I have this very simple function which runs the instance when passed to the multiprocessor:
def get_most_similar_for_a_given_text(instance,text,output):
i=instance[text]
output.put(i)
And then I have a batch multiprocessing script
def get_most_similar_for_all_texts_in_set(processed_set, instance):
output = mp.Queue()
# Setup a list of processes that we want to run
processes = [mp.Process(target=get_most_similar_for_a_given_text, args=(instance, text, output)) for text in processed_set]
num_cores = mp.cpu_count()
Scaling_factor_batch_jobs = 3
number_of_jobs = len(processes)
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs
num_of_batches = int(number_of_jobs // num_jobs_per_batch)+1
print('\n'+'Running batches now...')
for i in tqdm.tqdm(range(num_of_batches)):
# although the floor/ceilings look like things are getting double counted, for instance with ranges being 0:24,24:48,48.. etc.. this is not the case, for whatever reason it doesn't work like that
if i<num_of_batches-1: # true for all but last one
floor_job = int(i * num_jobs_per_batch) # int because otherwise it's a float and mp doesn't like that
ceil_job = int(floor_job + num_jobs_per_batch)
# Run processes
for p in processes[floor_job : ceil_job]:
p.start()
for p in processes[floor_job : ceil_job]:
p.join()
for p in mp.active_children():
p.terminate()
print(floor_job,ceil_job)
else: # true on last job, which picks up the missing batches that were lost due to rounding in the num_of_batches/num_jobs_per_batch formulas
floor_job = int(i * num_jobs_per_batch)
# Run processes
for p in processes[floor_job:]:
p.start()
for p in processes[floor_job:]:
p.join()
for p in mp.active_children():
p.terminate()
print(floor_job,len(processes))
# Get process results from the output queue
results = [output.get() for p in tqdm.tqdm(processes)]
np.save('/users/josh.flori/method_approaches/numpy_files/wmd_results_list.npy', results)
return results
What actually happens when I run this is that it runs batches 1:4 fine. Those batches account for texts 0:96 in processed_set, which is the text that I am looping through. But then it gets to the 5th batch, of texts 96:120, it appears to simply stop processing but does not fail or quit or crash or do anything. Visually, it looks like it is still running but it's not as my cpu activity goes back down and the progress bar stops moving.
I visually inspected texts 96:120 from processed_set and nothing looked weird. I then ran the get_most_similar_for_a_given_text function on those texts in isolation, outside of the multiprocessing function, and they completed just fine.
Anyway, to reiterate, it always happens at batch 5. Does anyone have any insight here? I am not very familiar with how multiprocessing works.
Thanks again