0

I have the following piece of code:

import sentence_transformers
import multiprocessing
from tqdm import tqdm
from multiprocessing import Pool
import numpy as np

embedding_model = sentence_transformers.SentenceTransformer('sentence-transformers/all-mpnet-base-v2')

data = [[100227, 7382501.0, 'view', 30065006, False, ''],
 [100227, 7382501.0, 'view', 57072062, True, ''],
 [100227, 7382501.0, 'view', 66405922, True, ''],
 [100227, 7382501.0, 'view', 5221475, False, ''],
 [100227, 7382501.0, 'view', 63283995, True, '']]

df_text = dict()
df_text[7382501] = {'title': 'The Geography of the Internet Industry, Venture Capital, Dot-coms, and Local Knowledge - MATTHEW A. ZOOK', 'abstract': '23', 'highlight': '12'}
df_text[30065006] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[57072062] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[66405922] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[5221475] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[63283995] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}


# Define the function to be executed in parallel
def process_data(chunk):
    results = []
    for row in chunk:
        print(row[0])
        work_id = row[1]
        mentioning_work_id = row[3]
        print(work_id)

        if work_id in df_text and mentioning_work_id in df_text:
            title1 = df_text[work_id]['title']
            title2 = df_text[mentioning_work_id]['title']
            embeddings_title1 = embedding_model.encode(title1,convert_to_numpy=True)
            embeddings_title2 = embedding_model.encode(title2,convert_to_numpy=True)
            
            similarity = np.matmul(embeddings_title1, embeddings_title2.T)
            
            results.append([row[0],row[1],row[2],row[3],row[4],similarity])
        else:
            continue
    return results

# Define the number of CPU cores to use
num_cores = multiprocessing.cpu_count()

# Split the data into chunks
chunk_size = len(data) // num_cores
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

# Create a pool of worker processest
pool = multiprocessing.Pool(processes=num_cores)

results = []
with tqdm(total=len(data)) as pbar:
    for i, result_chunk in enumerate(pool.map(process_data, chunks)):
        # Update the progress bar
        pbar.update()
        # Add the results to the list
        results += result_chunk

# Concatenate the results
final_result = results

I am running this code on Amazon Sagemaker and it runs just fine on an instance with 2 CPUs. It gives me the progress bar and everything. But I'd like to run it on a larger instance with more CPUs. But it just sort of hangs with more CPUs and doesn't progress at all. When I finally stop the kernel, I get this error:

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-18-19449c86abd3> in <module>
      1 results = []
      2 with tqdm(total=len(chunks)) as pbar:
----> 3     for i, result_chunk in enumerate(pool.map(process_data, chunks)):
      4         # Update the progress bar
      5         pbar.update()

/opt/conda/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    266         in a list that is returned.
    267         '''
--> 268         return self._map_async(func, iterable, mapstar, chunksize).get()
    269 
    270     def starmap(self, func, iterable, chunksize=None):

/opt/conda/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    649 
    650     def get(self, timeout=None):
--> 651         self.wait(timeout)
    652         if not self.ready():
    653             raise TimeoutError

/opt/conda/lib/python3.7/multiprocessing/pool.py in wait(self, timeout)
    646 
    647     def wait(self, timeout=None):
--> 648         self._event.wait(timeout)
    649 
    650     def get(self, timeout=None):

/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
    550             signaled = self._flag
    551             if not signaled:
--> 552                 signaled = self._cond.wait(timeout)
    553             return signaled
    554 

/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
    294         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    295             if timeout is None:
--> 296                 waiter.acquire()
    297                 gotit = True
    298             else:

KeyboardInterrupt: 

This makes me believe that it's waiting on resources? Not sure. Any help in this regard would be very appreciated. Also, when I run this code, I see a lot of cores created in the Sagemaker file explorer. enter image description here

Nick ODell
  • 15,465
  • 3
  • 32
  • 66
Patthebug
  • 4,647
  • 11
  • 50
  • 91
  • 1
    Running into this error while trying to reproduce this: `NameError: name 'df_text' is not defined` – Nick ODell Apr 28 '23 at 20:32
  • Ahh, yes. Let me reproduce `df_text` for you real quick. – Patthebug Apr 28 '23 at 20:33
  • 1
    Your chunking isn't doing anything here. `map` waits for the entire dataset to process before returning. You could let the pool do the chunking for you and use either imap or imap_unordered to do the tqdm thing. If your larger cores also have a larger dataset, you may see a long hang. – tdelaney Apr 28 '23 at 20:34
  • @NickODell Added a sample `df_text`. It's actually a dict(), not a dataframe. Just bad naming on my end. – Patthebug Apr 28 '23 at 20:42
  • @tdelaney But then why does it work for 2 CPUs? Shouldn't it not work work for that too? I am trying to modify `pool.map` to `pool.imap` and use the `chunksize` argument in it. – Patthebug Apr 28 '23 at 20:48

1 Answers1

0

I'm not sure how applicable this solution is to Amazon Sagemaker, but I find that I can avoid the deadlock inside the sentence tokenizer if I set the start method to 'spawn', in order to increase the isolation between each process.

import sentence_transformers
import multiprocessing
from tqdm import tqdm
import numpy as np
from multiprocessing import Pool
import multiprocessing

embedding_model = sentence_transformers.SentenceTransformer('sentence-transformers/all-mpnet-base-v2')

data = [[100227, 7382501.0, 'view', 30065006, False, ''],
 [100227, 7382501.0, 'view', 57072062, True, ''],
 [100227, 7382501.0, 'view', 66405922, True, ''],
 [100227, 7382501.0, 'view', 5221475, False, ''],
 [100227, 7382501.0, 'view', 63283995, True, '']]

df_text = dict()
df_text[7382501] = {'title': 'The Geography of the Internet Industry, Venture Capital, Dot-coms, and Local Knowledge - MATTHEW A. ZOOK', 'abstract': '23', 'highlight': '12'}
df_text[30065006] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[57072062] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[66405922] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[5221475] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[63283995] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}


# Define the function to be executed in parallel
def process_data(chunk):
    results = []
    for row in chunk:
        print(row[0])
        work_id = row[1]
        mentioning_work_id = row[3]
        print(work_id)

        if work_id in df_text and mentioning_work_id in df_text:
            title1 = df_text[work_id]['title']
            title2 = df_text[mentioning_work_id]['title']
            embeddings_title1 = embedding_model.encode(title1,convert_to_numpy=True)
            embeddings_title2 = embedding_model.encode(title2,convert_to_numpy=True)

            similarity = np.matmul(embeddings_title1, embeddings_title2.T)

            results.append([row[0],row[1],row[2],row[3],row[4],similarity])
        else:
            continue
    return results

if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    # Define the number of CPU cores to use
    # num_cores = multiprocessing.cpu_count()
    num_cores = 4

    # Split the data into chunks
    chunk_size = len(data) // num_cores
    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

    # Create a pool of worker processest
    # pool =

    results = []
    with multiprocessing.Pool(processes=num_cores) as pool:
        with tqdm(total=len(data)) as pbar:
            for i, result_chunk in enumerate(pool.map(process_data, chunks)):
                # Update the progress bar
                pbar.update(len(result_chunk))
                # Add the results to the list
                results += result_chunk

    # Concatenate the results
    final_result = results
    print(final_result)
Nick ODell
  • 15,465
  • 3
  • 32
  • 66
  • I tried this, but once again it just hanged on me and I had to interrupt the kernel. When I killed it, I got the same error I have posted in the original question. I am wondering if `Sagemaker`'s creation of `cores` (shown in the question) is an issue here. – Patthebug Apr 28 '23 at 21:33
  • @Patthebug The core files suggest that the subprocesses are encountering some unrecoverable error, and producing a core dump. Hard to say what the cause could be. You could try [debugging the core dump](https://stackoverflow.com/questions/5115613/core-dump-file-analysis), but that's generally pretty tricky. – Nick ODell Apr 28 '23 at 21:37