0

I'm trying to use multiprocessing to speed up some data labeling that I'm doing, but have noticed that it takes much longer (I actually never saw the program terminate). The original script takes about 7 hours to run, but this morning I came to work and noticed it was still running after I had let it run yesterday evening.

Task Overview

Input:
  1) Pandas DataFrame with a column of text
  2) Dictionary that looks like {word: label}.

(Desired) Output:
  Same DataFrame but this time with the positions of the words marked in the text.

Example:
DataFrame:
----------------------------------------------------
  | text
0 | I live in the United States.
----------------------------------------------------

Dict: {'United States': 'country'}

Output DataFrame:
----------------------------------------------------
  | text                         | labels
0 | I live in the United States. | [14, 26]
----------------------------------------------------

To explain the outcome a bit, the substring 'United States' is at position 14-26 within the text. I'm basically iterating over the DataFrame, further iterating over the dictionary, and then marking the positions using regular expressions.

What I Did

<Original Code>

def label_data(df, dict):
    pbar = tqdm(iterable=df.to_dict('records'), total=df.shape[0])
    for idx, row in enumerate(pbar): text = row['text'] spans = []    
        for word, label in label_dict.items():
            for match in re.finditer(word, text):
                start = match.start()
                end = match.end()
                spans.append([start, end])

        row['labels'] = spans
        df.iloc[idx] = row

    return df

<Parallelized Code>
from itertools import product
import multiprocessing as mp
import numpy as np

def label_data_parallel(df, dict):
    num_cores = mp.cpu_count() - 1 pool = mp.Pool(num_cores)

    df_chunks = np.array_split(df, num_cores)
    labeled_dfs = pool.starmap(label_data, \
                               product(df_chunks, [dict] * num_cores))
    df = pd.concat(labeled_dfs, axis=0)

    pool.close()
    pool.join()

    return df

Is there anything wrong with my code? Also, the DataFrame has around 200,000 rows and the dictionary has about 3,000 key-value pairs.

Sean
  • 2,890
  • 8
  • 36
  • 78
  • Before debugging the code, may I ask why did you use ```multiprocessing```? If its because not knowing about ```futures``` check it out. It'll make your life so much easier. Also, to give the answer to original question is it possible to post some sample data? – spramuditha Oct 21 '22 at 01:24
  • Okay, sheesh I understand your data now.. And I do not see any obvious errors in the code as well. It could be because the operation is actually not optimal to parallelise this way. Assuming you have like 10 processes it'd still have to loop 3000*2000 just to complete one. The way you have constructed I doubt it helps with time. Regardless could you kindly provide a trimmed down dataframe? (say, 10 dicts with 100 rows) I may be able post you a code that could help. – spramuditha Oct 21 '22 at 01:38
  • Thanks for the suggestions, I'm currently looking at the `futures` library now. Regarding posting an example, the reason I was hesitant is because it's not in English lol. I'll post an example regardless though. – Sean Oct 21 '22 at 01:40
  • Haha.. that's totally fine. We are after all talking about coded languages. :) My suggestion to even when you are using ```futures```. Multiprocessing tends to slow down aggregate time if you are sort of using your entire available computing power. Because see, all these libraries have necessarily optimised internal operations. So, it's good practice to leave at least some cores or processes to normal tasks. (Define max_workers in concurrent future of your choosing). – spramuditha Oct 21 '22 at 01:44
  • What if there are two matches in same string? How do you label it? – Nikolay Zakirov Oct 21 '22 at 01:46

2 Answers2

0

Did you think about another algorithm?

Three ideas:

  1. not iterating over a dataframe but rather search through combined text. This search has been optimized and studied for dozens of years so should be quite fast and hopefully well implemented in python re library. However, not to add labels that appeared because lines were glued together, add a giberrish separator. I used "@@@@@@". The gibberish separator (I admit not looking good) can be replaced by a simple check that match happened on line borders so skip it.
  2. All keys to be searched can also be glued into one regex pattern, then all work is done by re library in a more efficient way
  3. The regex pattern can be optimized as a trie as suggested here: Speed up millions of regex replacements in Python 3

E.g like this:

import nltk
import pandas as pd
import re
import string
import random
from nltk.corpus import words

random.seed(1)

# ------- this is just to create nice test data. Otherwise no need in nltk

nltk.download('words')

all_words = words.words()

data_df = pd.DataFrame(
    [
        ' '.join(random.choices(all_words, k=random.randint(1,20))) for _ in range(200000)
    ], columns = ["text"])

label_keys = {
    random.choice(all_words) for _ in range(3000)    
}

# -------- code starts here

class Trie():
    """Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern.
    The corresponding Regex should match much faster than a simple Regex union."""

    def __init__(self):
        self.data = {}

    def add(self, word):
        ref = self.data
        for char in word:
            ref[char] = char in ref and ref[char] or {}
            ref = ref[char]
        ref[''] = 1

    def dump(self):
        return self.data

    def quote(self, char):
        return re.escape(char)

    def _pattern(self, pData):
        data = pData
        if "" in data and len(data.keys()) == 1:
            return None

        alt = []
        cc = []
        q = 0
        for char in sorted(data.keys()):
            if isinstance(data[char], dict):
                try:
                    recurse = self._pattern(data[char])
                    alt.append(self.quote(char) + recurse)
                except:
                    cc.append(self.quote(char))
            else:
                q = 1
        cconly = not len(alt) > 0

        if len(cc) > 0:
            if len(cc) == 1:
                alt.append(cc[0])
            else:
                alt.append('[' + ''.join(cc) + ']')

        if len(alt) == 1:
            result = alt[0]
        else:
            result = "(?:" + "|".join(alt) + ")"

        if q:
            if cconly:
                result += "?"
            else:
                result = "(?:%s)?" % result
        return result

    def pattern(self):
        return self._pattern(self.dump())

trie_pattern = Trie()

for label in label_keys:
    trie_pattern.add(re.escape(label))

reg_pattern = trie_pattern.pattern()

list_of_texts = list(data_df.text)

indices = list(map(len,list_of_texts))

all_text = "@@@@@@".join(data_df.text) # @@@@@@ - something of known length you don't expect in the text

all_matches = []
for match_ in re.finditer(reg_pattern, all_text):
    all_matches.append(match_.span())
all_matches.sort(key=lambda x: x[0])

label_l = []
start = 0
all_matches_pointer = 0
indices_pointer = 0
label_l.append([]) 
while True:    
    if all_matches_pointer >= len(all_matches):
        for _ in range(len(label_l),len(data_df)):
            label_l.append( [])
        break
    match_start = all_matches[all_matches_pointer][0]
    match_end = all_matches[all_matches_pointer][1]
    if match_start >= start + indices[indices_pointer]:
        label_l.append([]) 
        start += indices[indices_pointer] + 6 # len("@@@@@@")
        indices_pointer += 1
    else:
        label_l[-1] += [(match_start - start, match_end - start)]
        all_matches_pointer += 1
            
    
data_df["labels"] = label_l
data_df

gives you desired result in a few seconds:

    text    labels
0   overempty stirring asyla butchering Sherrymoor  [(5, 6), (19, 21), (42, 43)]
1   premeditator spindliness bilamellate amidosucc...   [(3, 4), (8, 10), (29, 30), (33, 35), (38, 39)...
2   Radek vivicremation rusot noegenetic Shropshir...   [(13, 14), (14, 16), (50, 52), (76, 78), (88, ...
3   uninstructiveness blintze plunging rowiness fi...   [(58, 59), (87, 88), (109, 110), (122, 124), (...
4   memorialize scruffman   [(0, 1), (2, 3), (18, 19)]
... ... ...
199995  emulsor treatiser   [(1, 2), (11, 13)]
199996  squibling anisandrous incorrespondent vague jo...   [(13, 15), (40, 43), (52, 53), (71, 73), (130,...
199997  proallotment bulletheaded uningenuousness plat...   [(0, 5), (8, 9), (44, 46), (62, 65), (75, 77)]
199998  unantiquatedness sulphohalite oversoftness und...   [(6, 10), (32, 35), (65, 67), (68, 71), (118, ...
199999  lenticulothalamic aerometric plastidium panell...   [(14, 15), (22, 23), (31, 33), (38, 39), (46, ...
200000 rows × 2 columns

So I tried specifically with your params (see code). Dataframe of 200k rows and 3000 labels. The algorithm runs just 3-5 seconds on my m1

Problems not tackled yet that depend on your input really:

  1. What if labels overlap inside one line? Then would need to add a loop so that each iteration searches each label seperately (and it can be multiprocessed)
Nikolay Zakirov
  • 1,505
  • 8
  • 17
0

There are several efficiencies that can be made:

  1. Instead of doing re.finditer on each country, you could create an optimized single regular expression that searches for any of the countries in label_dict and pass that regex to your worker function. Given that you are searching for 3,000 countries, this should greatly improve the speed.
  2. You only need to pass an array of strings instead an array of dataframes to the worker function along with the aforementioned compiled regular expression.
  3. You are leaving the main process a processor to use by creating a pool size of mp.cpu_count() - 1. But then you are calling starmap, which blocks until all the results have been returned, at which point the pool processes are idle. Instead you could use method imap, which can start processing the results as soon as a worker function returns something. But the amount of processing being done by the main process may not warrant dedicating a processor to it. In the code below I am using all the processors available for constructing the pool. But you could try leaving one left over for the main process to see if that is more performant.
  4. The worker function only has to return a list of the spans it found. The main process will add a new column to the original dataframe using this data.
def label_data(texts, regex):
    return [
        [[match.start(), match.end()] for match in regex.finditer(text)]
        for text in texts
        ]

def label_data_parallel(df, label_dict):
    import multiprocessing as mp
    import numpy as np
    import re
    from functools import partial

    class Trie():
        """Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern.
        The corresponding Regex should match much faster than a simple Regex union."""

        def __init__(self):
            self.data = {}

        def add(self, word):
            ref = self.data
            for char in word:
                ref[char] = char in ref and ref[char] or {}
                ref = ref[char]
            ref[''] = 1

        def dump(self):
            return self.data

        def quote(self, char):
            return re.escape(char)

        def _pattern(self, pData):
            data = pData
            if "" in data and len(data.keys()) == 1:
                return None

            alt = []
            cc = []
            q = 0
            for char in sorted(data.keys()):
                if isinstance(data[char], dict):
                    try:
                        recurse = self._pattern(data[char])
                        alt.append(self.quote(char) + recurse)
                    except:
                        cc.append(self.quote(char))
                else:
                    q = 1
            cconly = not len(alt) > 0

            if len(cc) > 0:
                if len(cc) == 1:
                    alt.append(cc[0])
                else:
                    alt.append('[' + ''.join(cc) + ']')

            if len(alt) == 1:
                result = alt[0]
            else:
                result = "(?:" + "|".join(alt) + ")"

            if q:
                if cconly:
                    result += "?"
                else:
                    result = "(?:%s)?" % result
            return result

        def pattern(self):
            return self._pattern(self.dump())


    num_cores = mp.cpu_count()

    text_chunks = np.array_split(df['text'].values.tolist(), num_cores)

    trie = Trie()
    for country in label_dict.keys():
        trie.add(country)
    regex = re.compile(trie.pattern())

    pool = mp.Pool(num_cores)

    label_spans = []
    for spans in pool.imap(partial(label_data, regex=regex), text_chunks):
        label_spans.extend(spans)
    pool.close()
    pool.join()

    df['labels'] = label_spans


    return df

def main():
    import pandas as pd

    df = pd.DataFrame({'text': [
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
    ]})

    label_dict = {
        'United States': 'country',
        'France': 'country',
    }

    label_data_parallel(df, label_dict)
    print(df)

if __name__ == '__main__':
    main()

Prints:

                                                 text              labels
0                        I live in the United States.          [[14, 27]]
1                        I live in the United States.          [[14, 27]]
2                        I live in the United States.          [[14, 27]]
3                        I live in the United States.          [[14, 27]]
4                        I live in the United States.          [[14, 27]]
5                        I live in the United States.          [[14, 27]]
6                        I live in the United States.          [[14, 27]]
7                        I live in the United States.          [[14, 27]]
8   France is where I live But I used to live in t...  [[0, 6], [49, 62]]
9   France is where I live But I used to live in t...  [[0, 6], [49, 62]]
10  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
11  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
12  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
13  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
14  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
15  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
Booboo
  • 38,656
  • 3
  • 37
  • 60