0

So I'm looking for a way to speed up the output of the following code, calling google's natural language API:

tweets = json.load(input)

client = language.LanguageServiceClient()

sentiment_tweets = []

iterations = 1000

start = timeit.default_timer()

for i, text in enumerate(d['text'] for d in tweets):

    document = types.Document(
    content=text,
    type=enums.Document.Type.PLAIN_TEXT)

    sentiment = client.analyze_sentiment(document=document).document_sentiment

    results = {'text': text, 'sentiment':sentiment.score, 'magnitude':sentiment.magnitude}

    sentiment_tweets.append(results)

    if (i % iterations) == 0:
        print(i, " tweets processed")

sentiment_tweets_json = [json.dumps(sentiments) for sentiments in sentiment_tweets]

stop = timeit.default_timer()

The issue is the tweets list is around 100k entries, iterating and making calls one by one does not produce an output on a feasible timescale. I'm exploring potentially using asyncio for parallel calls, although as I'm still a beginner with Python and unfamiliar with the package, I'm not sure if you can make a function a coroutine with itself such that each instance of the function iterates through the list as expected, progressing sequentially. There is also the question of managing the total number of calls made by the app to be within the defined quota limits of the API. Just wanted to know if I was going in the right direction.

AJG
  • 129
  • 7

2 Answers2

0

While async is one way you could go, another that might be easier is using the multiprocessing python functionalities.

from multiprocessing import Pool

def process_tweet(tweet):
    pass # Fill in the blanks here

# Use five processes at once
with Pool(5) as p:
    processes_tweets = p.map(process_tweet, tweets, 1)

In this case "tweets" is an iterator of some sort, and each element of that iterator will get passed to your function. The map function will make sure the results come back in the same order the arguments were supplied.

Robert Hafner
  • 3,364
  • 18
  • 23
  • Gave this a go but seems to be going round in circles (i.e. the function runs over the iterator 3 seperate times, rather than interleaving each input sequentially into each instance) and ending in a weird error: `MaybeEncodingError: Error sending result: ''. Reason: 'TypeError("can't pickle _thread.RLock objects")'` – AJG Apr 30 '20 at 20:59
  • Here's the fix to that problem - https://stackoverflow.com/questions/8804830/python-multiprocessing-picklingerror-cant-pickle-type-function/58897266#58897266 – Robert Hafner Apr 30 '20 at 21:00
  • Ah ok, no more error! But there's still the issue of the 3 (I'm using `Pool(3)`) separate instances running parallel over the same elements rather than interleaving sequentially: Desired functionality: Process 1: A -> D Process 2: B -> E Process 3: C -> F Observed functionality: Process 1: A -> B Process 2: A -> B Process 3: A -> B I'm defining the function as the whole for loop in the question above, is that incorrect? – AJG Apr 30 '20 at 21:10
  • Try `p.map(process_tweet, tweets, 1)` to tell it to only send one element at a time. – Robert Hafner Apr 30 '20 at 21:18
0

I use this method for concurrent calls:

from concurrent import futures as cf

def execute_all(mfs: list, max_workers: int = None):
    """Excecute concurrently and mfs list.

    Parameters
    ----------
    mfs : list
        [mfs1, mfs2,...]
        mfsN = {
            tag: str,
            fn: function,
            kwargs: dict
        }
        .
    max_workers : int
        Description of parameter `max_workers`.

    Returns
    -------
    dict
        {status, result, error}
        status = {tag1, tag2,..}
        result = {tag1, tag2,..}
        error = {tag1, tag2,..}

    """
    result = {
        'status': {},
        'result': {},
        'error': {}
    }
    max_workers = len(mfs)
    with cf.ThreadPoolExecutor(max_workers=max_workers) as exec:
        my_futures = {
            exec.submit(x['fn'], **x['kwargs']): x['tag'] for x in mfs
        }
        for future in cf.as_completed(my_futures):
            tag = my_futures[future]
            try:
                result['result'][tag] = future.result()
                result['status'][tag] = 0
            except Exception as err:
                result['error'][tag] = err
                result['result'][tag] = None
                result['status'][tag] = 1
    return result

Where each result returns indexed by a given tag (if matters to you identify the which call return which result) when:

mfs = [
    {
        'tag': 'tweet1',
        'fn': process_tweet,
        'kwargs': {
            'tweet': tweet1
        }
    },
    {
        'tag': 'tweet2',
        'fn': process_tweet,
        'kwargs': {
            'tweet': tweet2
        }
    },
]

results = execute_all(mfs, 2)
ontananza
  • 362
  • 6
  • 7