1

I have a database record set (approx. 1000 rows) and I am currently iterating through them, to integrate more data using extra db query for each record.

Doing that, raises the overall process time to maybe 100 seconds.

What I want to do is share the functionality to 2-4 processes.

I am using Python 2.7 to have AWS Lambda compatibility.

def handler(event, context):

    try:

        records = connection.get_users()

        mandrill_client = open_mandrill_connection()

        mandrill_messages = get_mandrill_messages()

        mandrill_template = 'POINTS weekly-report-to-user'

        start_time = time.time()

        messages = build_messages(mandrill_messages, records)

        print("OVERALL: %s seconds ---" % (time.time() - start_time))

        send_mandrill_message(mandrill_client, mandrill_template, messages)

        connection.close_database_connection()

        return "Process Completed"

    except Exception as e:

        print(e)

Following is the function which I want to put into threads:

def build_messages(messages, records):

for record in records:

    record = dict(record)

    stream = get_user_stream(record)

    data = compile_loyalty_stream(stream)

    messages['to'].append({
        'email': record['email'],
        'type': 'to'
    })

    messages['merge_vars'].append({
        'rcpt': record['email'],
        'vars': [
            {
                'name': 'total_points',
                'content': record['total_points']
            },
            {
                'name': 'total_week',
                'content': record['week_points']
            },
            {
                'name': 'stream_greek',
                'content': data['el']
            },
            {
                'name': 'stream_english',
                'content': data['en']
            }
        ]
    })

return messages

What I have tried is importing the multiprocessing library:

from multiprocessing.pool import ThreadPool

Created a pool inside the try block and mapped the function inside this pool:

pool = ThreadPool(4)

messages = pool.map(build_messages_in, itertools.izip(itertools.repeat(mandrill_messages), records))

def build_messages_in(a_b):
    build_msg(*a_b)


def build_msg(a, b):
    return build_messages(a, b)

def get_user_stream(record):

    response = []

    i = 0

    for mod, mod_id, act, p, act_created in izip(record['models'], record['model_ids'], record['actions'],
                                                 record['points'], record['action_creation']):

        information = get_reference(mod, mod_id)

        if information:

            response.append({
                'action': act,
                'points': p,
                'created': act_created,
                'info': information
            })

            if (act == 'invite_friend') \
                    or (act == 'donate') \
                    or (act == 'bonus_500_general') \
                    or (act == 'bonus_1000_general') \
                    or (act == 'bonus_500_cancel') \
                    or (act == 'bonus_1000_cancel'):

                response[i]['info']['date_ref'] = act_created
                response[i]['info']['slug'] = 'attiki'

            if (act == 'bonus_500_general') \
                    or (act == 'bonus_1000_general') \
                    or (act == 'bonus_500_cancel') \
                    or (act == 'bonus_1000_cancel'):

                response[i]['info']['title'] = ''

            i += 1

    return response

Finally I removed the for loop from the build_message function.

What I get as a results is a 'NoneType' object is not iterable.

Is this the correct way of doing this?

mallix
  • 1,399
  • 1
  • 21
  • 44
  • @GhostCat I submitted the question and forgot to write down what I have tried. Here is some code which works, below is some code that supposed to work but it doesn't. Basically I am trying to multi-process the build_messages function. – mallix Sep 28 '16 at 15:03
  • Great. Now let the experts come in to help you ;-) – GhostCat Sep 28 '16 at 15:20
  • You haven't said where you're getting that error. From what I can gather, this is using the `mailchimp` API and I'm assuming that the longest wait is for an API response? – roganjosh Sep 28 '16 at 15:51
  • @roganjosh ignore the mailchimp API call, it takes 1 second to finish for a thousand mails. The error is coming from build_messages. – mallix Sep 28 '16 at 15:53
  • @mallix but from where in `build_messages`? If you removed the `for` loop then I can't see anywhere where you are iterating? – roganjosh Sep 28 '16 at 15:54
  • @roganjosh do I need to iterate when I m trying to implement this via threads? – mallix Sep 28 '16 at 15:56
  • More what I'm getting at: rather than trying to do this through multithreading in `build_messages`, can you isolate the part of the code that takes so long? Because the only thing I can see is `get_user_stream` and `compile_user_stream`, for which we cannot see the contents. Before battling with multithreading, it's worth seeing if it will give you any speedup. Often it won't. – roganjosh Sep 28 '16 at 15:56
  • For example, this: http://stackoverflow.com/questions/39728974/is-my-python-multithreading-code-affected-by-the-global-interpreter-lock/39729183?noredirect=1#comment66756464_39729183 – roganjosh Sep 28 '16 at 15:57
  • @roganjosh yes get_user_stream is the reason. Mainly because get_reference is another database call. – mallix Sep 28 '16 at 16:00
  • We might be going down a rabbit hole on this _specific_ example :) Is `get_information` querying the db via `requests` library? If so, I could give you some generic code so you can try split those calls across multiple processes (`multiprocessing` rather than `multithreading`) i.e. each process runs in parallel and only has to do 250 calls... close to 1/4 of the time, plus a bit of overhead. It wouldn't be a complete answer but might put you on the right path. – roganjosh Sep 28 '16 at 16:08
  • @roganjosh I am using psycopg2 for Postgres for querying the database. This sounds interesting though. – mallix Sep 28 '16 at 16:15
  • @mallix I'll prepare something more generic then and give some pointers; I don't know if someone could give you some thing more specific eventually. It will take me a little while. – roganjosh Sep 28 '16 at 16:21

1 Answers1

2

Your code seems pretty in-depth and so you cannot be sure that multithreading will lead to any performance gains when applied on a high level. Therefore, it's worth digging down to the point that gives you the largest latency and considering how to approach the specific bottleneck. See here for greater discussion on threading limitations.

If, for example as we discussed in comments, you can pinpoint a single task that is taking a long time, then you could try to parallelize it using multiprocessing instead - to leverage more of your CPU power. Here is a generic example that hopefully is simple enough to understand to mirror your Postgres queries without going into your own code base; I think that's an unfeasible amount of effort tbh.

import multiprocessing as mp
import time
import random
import datetime as dt

MAILCHIMP_RESPONSE = [x for x in range(1000)]

def chunks(l, n):
    n = max(1, n)
    return [l[i:i + n] for i in range(0, len(l), n)]


def db_query():
    ''' Delayed response from database '''
    time.sleep(0.01)
    return random.random()


def do_queries(query_list):
    ''' The function that takes all your query ids and executes them 
    sequentially for each id '''
    results = []
    for item in query_list:
        query = db_query()
        # Your super-quick processing of the Postgres response
        processing_result = query * 2
        results.append([item, processing_result])
    return results


def single_processing():
    ''' As you do now - equivalent to get_reference '''
    result_of_process = do_queries(MAILCHIMP_RESPONSE)
    return result_of_process


def multi_process(chunked_data, queue):
    ''' Same as single_processing, except we put our results in queue rather
    than returning them '''
    result_of_process = do_queries(chunked_data)
    queue.put(result_of_process)


def multiprocess_handler():
    ''' Divide and conquor on our db requests. We split the mailchimp response
    into a series of chunks and fire our queries simultaneously. Thus, each
    concurrent process has a smaller number of queries to make '''

    num_processes = 4 # depending on cores/resources
    size_chunk = len(MAILCHIMP_RESPONSE) / num_processes
    chunked_queries = chunks(MAILCHIMP_RESPONSE, size_chunk)

    queue = mp.Queue() # This is going to combine all the results

    processes = [mp.Process(target=multi_process, 
                args=(chunked_queries[x], queue)) for x in range(num_processes)]

    for p in processes: p.start()

    divide_and_conquor_result = []
    for p in processes:
        divide_and_conquor_result.extend(queue.get())

    return divide_and_conquor_result


if __name__ == '__main__':
    start_single = dt.datetime.now()

    single_process = single_processing()

    print "Single process took {}".format(dt.datetime.now() - start_single)
    print "Number of records processed = {}".format(len(single_process))

    start_multi = dt.datetime.now()

    multi = multiprocess_handler()

    print "Multi process took {}".format(dt.datetime.now() - start_multi)
    print "Number of records processed = {}".format(len(multi))
Community
  • 1
  • 1
roganjosh
  • 12,594
  • 4
  • 29
  • 46