0

I store QuertyText within a pandas dataframe. Once I've loaded all the queries into I want to conduct an analysis again each query. Currently, I have ~50k to evaluate. So, doing it one by one, will take a long time.

So, I wanted to implement concurrent.futures. How do I take the individual QueryText stored within fullAnalysis as pass it to concurrent.futures and return the output as a variable?

Here is my entire code:

import pandas as pd
import time
import gensim
import sys
import warnings

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

fullAnalysis = pd.DataFrame()

def fetch_data(jFile = 'ProcessingDetails.json'):
    print("Fetching data...please wait")

    #read JSON file for latest dictionary file name
    baselineDictionaryFileName = 'Dictionary/Dictionary_05-03-2020.json'

    #copy data to pandas dataframe
    labelled_data = pd.read_json(baselineDictionaryFileName)

    #Add two more columns to get the most similar text and score
    labelled_data['SimilarText'] = ''
    labelled_data['SimilarityScore'] = float()

    print("Data fetched from " + baselineDictionaryFileName + " and there are " + str(labelled_data.shape[0]) + " rows to be evalauted")

    return labelled_data


def calculateScore(inputFunc):
    warnings.filterwarnings("ignore", category=DeprecationWarning) 

    model = gensim.models.Word2Vec.load('w2v_model_bigdata')

    inp = inputFunc
    print(inp)
    out = dict()

    strEvaluation = inp.split("most_similar ",1)[1]

    #while inp != 'quit':
    split_inp = inp.split()

    try:
        if split_inp[0] == 'help':
            pass
        elif split_inp[0] == 'similarity' and len(split_inp) >= 3:
            pass
        elif split_inp[0] == 'most_similar' and len(split_inp) >= 2:
            for pair in model.most_similar(positive=[split_inp[1]]):
                out.update({pair[0]: pair[1]})

    except KeyError as ke:
        #print(str(ke) + "\n")
        inp = input()
    return out

def main():
    with ThreadPoolExecutor(max_workers=5) as executor:
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            #for item in executor.map(calculateScore, arg):
            output = executor.map(calculateScore, arg)

    return output

if __name__ == "__main__":
    fullAnalysis = fetch_data()
    results = main()
    print(f'results: {results}')
Konrad Rudolph
  • 530,221
  • 131
  • 937
  • 1,214
emie
  • 289
  • 6
  • 18

2 Answers2

0

The Python Global Interpreter Lock or GIL allows only one thread to hold control of the Python interpreter. Since your function calculateScore might be cpu-bound and requires the interpreter to execute its byte code, you may be gaining little by using threading. If, on the other hand, it were doing mostly I/O operations, it would be giving up the GIL for most of its running time allowing other threads to run. But that does not seem to be the case here. You probably should be using the ProcessPoolExecutor from concurrent.futures (try it both ways and see):

def main():
    with ProcessPoolExecutor(max_workers=None) as executor:
        the_futures = {}
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            future = executor.submit(calculateScore, arg)
            the_futures[future] = i # map future to request
        for future in as_completed(the_futures): # results as they become available not necessarily the order of submission
            i = the_futures[future] # the original index
            result = future.result() # the result

If you omit the max_workers parameter (or specify a value of None) from the ProcessPoolExecutor constructor, the default will be the number of processors you have on your machine (not a bad default). There is no point in specifying a value larger than the number of processors you have.

If you do not need to tie the future back to the original request, then the_futures can just be a list to which But simplest yest in not even to bother to use the as_completed method:

def main():
    with ProcessPoolExecutor(max_workers=5) as executor:
        the_futures = []
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            future = executor.submit(calculateScore, arg)
            the_futures.append(future)
        # wait for the completion of all the results and return them all:
        results = [f.result() for f in the_futures()] # results in creation order
        return results 

It should be mentioned that code that launches the ProcessPoolExecutor functions should be in a block governed by a if __name__ = '__main__':. If it isn't you will get into a recursive loop with each subprocess launching the ProcessPoolExecutor. But that seems to be the case here. Perhaps you meant to use the ProcessPoolExecutor all along?

Also:

I don't know what the line ...

model = gensim.models.Word2Vec.load('w2v_model_bigdata')

... in function calculateStore does. It may be the one i/o-bound statement. But this appears to be something that does not vary from call to call. If that is the case and model is not being modified in the function, shouldn't this statement be moved out of the function and computed just once? Then this function would clearly run faster (and be clearly cpu-bound).

Also:

The exception block ...

except KeyError as ke:
    #print(str(ke) + "\n")
    inp = input()

... is puzzling. You are inputting a value that will never be used right before returning. If this is to pause execution, there is no error message being output.

Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Hi Booboo... thank you for the solution. I do need to know the original query details. When I run "with ProcessPoolExecutor(max_workers=None) as executor:" I get the following error: "A process in the process pool was terminated abruptly while the future was running or pending." The line "model = gensim.models.Word2Vec.load('w2v_model_bigdata')" is the output of another PY process that trains the model using gensim model. Good pick-up on the exception block. I need to update it. – emie May 06 '20 at 04:46
  • I get the error on this line: results = future.result() You mentioned that the process should be governed by "if __name__ = '__main__':". Does that mean that I should move the 'code' to name? If not, then what should I do to avoid the issue that you pointed out. Thanks again for the assistance. – emie May 06 '20 at 04:55
  • I ended up reading this post and I resolved the "A process in the process pool was terminated abruptly while the future was running or pending." https://stackoverflow.com/questions/43836876/processpoolexecutor-works-on-ubuntu-but-fails-with-brokenprocesspool-when-r – emie May 06 '20 at 06:43
  • The code that executes `with ProcessPoolExecutor(max_workers=None) as executor:` is in function `main`. But function `main` only gets invoked when called and that call only happens in the block that begins if \_\_name__ == '\_\_main\_\_'. So you should be okay as is. Is everything else resolved? – Booboo May 06 '20 at 09:25
  • Also, in my version that does not use `as_completed`, I did not mean to specify `max_workers=5` -- that should have been `max_workers=None` *or omitted altogether). – Booboo May 06 '20 at 09:31
  • Thanks for the feedback Boobo. The code you assisted with has greatly improved processing time by more than 60%. – emie May 06 '20 at 20:35
0

With Booboo assistance, I was able to update code to include ProcessPoolExecutor. Here is my updated code. Overall, processing has been speed up by more than 60%.

I did run into a processing issue and found this topic BrokenPoolProcess that addresses the issue.

output = {}
thePool = {}

def main(labelled_data, dictionaryRevised):

    args = sys.argv[1:]

    with ProcessPoolExecutor(max_workers=None) as executor:
        for i in range(len(labelled_data)):
            text = labelled_data['QueryText'][i]
            arg = 'most_similar'+ ' '+ text

            output = winprocess.submit(
            executor, calculateScore, arg
            )
            thePool[output] = i  #original index for future to request


        for output in as_completed(thePool): # results as they become available not necessarily the order of submission
            i = thePool[output] # the original index
            text = labelled_data['QueryText'][i]
            result = output.result() # the result

            maximumKey = max(result.items(), key=operator.itemgetter(1))[0]
            maximumValue = result.get(maximumKey)

            labelled_data['SimilarText'][i] = maximumKey
            labelled_data['SimilarityScore'][i] = maximumValue


    return labelled_data, dictionaryRevised

if __name__ == "__main__":
    start = time.perf_counter()

    print("Starting to evaluate Query Text for labelling...")

    output_Labelled_Data, output_dictionary_revised = preProcessor()

    output,dictionary = main(output_Labelled_Data, output_dictionary_revised)


    finish = time.perf_counter()
    print(f'Finished in {round(finish-start, 2)} second(s)')
emie
  • 289
  • 6
  • 18