I have the following piece of code:
import sentence_transformers
import multiprocessing
from tqdm import tqdm
from multiprocessing import Pool
import numpy as np
embedding_model = sentence_transformers.SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
data = [[100227, 7382501.0, 'view', 30065006, False, ''],
[100227, 7382501.0, 'view', 57072062, True, ''],
[100227, 7382501.0, 'view', 66405922, True, ''],
[100227, 7382501.0, 'view', 5221475, False, ''],
[100227, 7382501.0, 'view', 63283995, True, '']]
df_text = dict()
df_text[7382501] = {'title': 'The Geography of the Internet Industry, Venture Capital, Dot-coms, and Local Knowledge - MATTHEW A. ZOOK', 'abstract': '23', 'highlight': '12'}
df_text[30065006] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[57072062] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[66405922] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[5221475] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[63283995] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
# Define the function to be executed in parallel
def process_data(chunk):
results = []
for row in chunk:
print(row[0])
work_id = row[1]
mentioning_work_id = row[3]
print(work_id)
if work_id in df_text and mentioning_work_id in df_text:
title1 = df_text[work_id]['title']
title2 = df_text[mentioning_work_id]['title']
embeddings_title1 = embedding_model.encode(title1,convert_to_numpy=True)
embeddings_title2 = embedding_model.encode(title2,convert_to_numpy=True)
similarity = np.matmul(embeddings_title1, embeddings_title2.T)
results.append([row[0],row[1],row[2],row[3],row[4],similarity])
else:
continue
return results
# Define the number of CPU cores to use
num_cores = multiprocessing.cpu_count()
# Split the data into chunks
chunk_size = len(data) // num_cores
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
# Create a pool of worker processest
pool = multiprocessing.Pool(processes=num_cores)
results = []
with tqdm(total=len(data)) as pbar:
for i, result_chunk in enumerate(pool.map(process_data, chunks)):
# Update the progress bar
pbar.update()
# Add the results to the list
results += result_chunk
# Concatenate the results
final_result = results
I am running this code on Amazon Sagemaker
and it runs just fine on an instance with 2 CPUs. It gives me the progress bar and everything. But I'd like to run it on a larger instance with more CPUs. But it just sort of hangs with more CPUs and doesn't progress at all. When I finally stop the kernel, I get this error:
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
<ipython-input-18-19449c86abd3> in <module>
1 results = []
2 with tqdm(total=len(chunks)) as pbar:
----> 3 for i, result_chunk in enumerate(pool.map(process_data, chunks)):
4 # Update the progress bar
5 pbar.update()
/opt/conda/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
266 in a list that is returned.
267 '''
--> 268 return self._map_async(func, iterable, mapstar, chunksize).get()
269
270 def starmap(self, func, iterable, chunksize=None):
/opt/conda/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
649
650 def get(self, timeout=None):
--> 651 self.wait(timeout)
652 if not self.ready():
653 raise TimeoutError
/opt/conda/lib/python3.7/multiprocessing/pool.py in wait(self, timeout)
646
647 def wait(self, timeout=None):
--> 648 self._event.wait(timeout)
649
650 def get(self, timeout=None):
/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
550 signaled = self._flag
551 if not signaled:
--> 552 signaled = self._cond.wait(timeout)
553 return signaled
554
/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
294 try: # restore state no matter what (e.g., KeyboardInterrupt)
295 if timeout is None:
--> 296 waiter.acquire()
297 gotit = True
298 else:
KeyboardInterrupt:
This makes me believe that it's waiting on resources? Not sure. Any help in this regard would be very appreciated.
Also, when I run this code, I see a lot of cores
created in the Sagemaker
file explorer.