0

My use case is rather simple, or at least I feel like it is, coming from Node... but I just can't quite get my head around this problem. I need less help with the coding aspect of this issue, and more with just an explanation: I want to run an asynchronous process (a decrypt and encrypt function from the aws-encryption-sdk) multiple times (as many times as possible) in parallel. I guess my main question is, what's the best way to do this?

Follow up question... it seems to me that the best way to do this would be an asynchronous function (asyncio format) that handles the decryption and encryption, handled by a pool of threads rather than a single one. At times, I'll need to run this function 2000 times as fast as possible (not necessarily in complete parallel, obviously). However, even if this is correct, I'm not certain how to do this.

Thanks much to anybody who can help me. To give you an idea of what track I'm on, I was very much considering using the answer from this thread: https://stackoverflow.com/a/29280606/5335646. However, some sources are saying that this is more for blocking functions, and not really for truly asynchronous ones, like in my case (a call to AWS KMS that's returned later).

Ryan Spicer
  • 501
  • 3
  • 18
  • I think you need a better understanding of `concurrent` vs `parallel`. Parallel means that everything is happening in different OS level threads or processes at the exact same time. Concurrent means that anytime something blocks, another task is run asynchronously. In python, you're not going to be accessing OS level threads because the GIL is going to prevent you from doing that. Because your code is going to be CPU bound (encryption/decryption requires computation) you should take a parallel approach. If your code was IO bound (network IO, disk IO, etc) it would be better to use asyncio. – Charles D Pantoga Sep 20 '17 at 21:13
  • As the one answer youve been given states, you should do this task in different processes. – Charles D Pantoga Sep 20 '17 at 21:14
  • I think you're mostly on track, but you're slightly mistaken - the process is actually network IO bound, I believe - my process is not actually performing the encryption, it's a module that sends the encryption/decryption process off to AWS KMS. This means that the actual encryption and decryption is happening off my machine, and I'm simply waiting for a response. – Ryan Spicer Sep 20 '17 at 21:18
  • Ah I see, well then in that case asyncio is a good choice. – Charles D Pantoga Sep 21 '17 at 00:48

1 Answers1

0

If you are using python3.5 or newer, you could use async/await statements.

import asyncio
import multiprocessing
import random
import time
from concurrent.futures import ProcessPoolExecutor

count_received = 0
tests_count = 16


async def AWS_KMS_call(i, text, action='encrypt'):
    print(i, multiprocessing.current_process().name, action)
    await asyncio.sleep(random.random())  # emulate some work
    return text[::-1]


async def encrypt(i, text):
    encrypted = await AWS_KMS_call(i, text, action='encrypt')
    print(i, multiprocessing.current_process().name, encrypted, 'encrypted')
    return encrypted


async def decrypt(i, text):
    encrypted = await encrypt(i, text)
    decrypted = await AWS_KMS_call(i, encrypted, action='decrypt')
    print(i, multiprocessing.current_process().name, encrypted, 'decrypted')
    return decrypted


def pool_func(i, text):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    decrypted = loop.run_until_complete(decrypt(i, text))
    print(i, multiprocessing.current_process().name, decrypted, 'pool_func exit')
    return i, decrypted


def got_result(future):
    global count_received
    global tests_count
    count_received += 1
    i, text = future.result()
    print(i, multiprocessing.current_process().name, text, 'done')

    if count_received == tests_count:
        loop.stop()


if __name__ == '__main__':
    text = 'some_data_to_encrypt'
    executor = ProcessPoolExecutor(16)

    loop = asyncio.get_event_loop()

    for i in range(tests_count):
        task = asyncio.ensure_future(loop.run_in_executor(executor, pool_func, i, text))
        task.add_done_callback(got_result)

    try:
        loop.run_forever()
    finally:
        loop.close()

I added several prints for clarity and output looks like:

0 Process-1 encrypt
1 Process-3 encrypt
2 Process-5 encrypt
4 Process-2 encrypt
5 Process-4 encrypt
6 Process-6 encrypt
7 Process-9 encrypt
8 Process-7 encrypt
9 Process-10 encrypt
10 Process-8 encrypt
3 Process-15 encrypt
11 Process-11 encrypt
12 Process-12 encrypt
13 Process-14 encrypt
15 Process-13 encrypt
14 Process-16 encrypt
1 Process-3 tpyrcne_ot_atad_emos encrypted
1 Process-3 decrypt
6 Process-6 tpyrcne_ot_atad_emos encrypted
6 Process-6 decrypt
15 Process-13 tpyrcne_ot_atad_emos encrypted
15 Process-13 decrypt
5 Process-4 tpyrcne_ot_atad_emos encrypted
5 Process-4 decrypt
3 Process-15 tpyrcne_ot_atad_emos encrypted
3 Process-15 decrypt
9 Process-10 tpyrcne_ot_atad_emos encrypted
9 Process-10 decrypt
1 Process-3 tpyrcne_ot_atad_emos decrypted
1 Process-3 some_data_to_encrypt pool_func exit
1 MainProcess some_data_to_encrypt done
4 Process-2 tpyrcne_ot_atad_emos encrypted
4 Process-2 decrypt
11 Process-11 tpyrcne_ot_atad_emos encrypted
11 Process-11 decrypt
10 Process-8 tpyrcne_ot_atad_emos encrypted
10 Process-8 decrypt
10 Process-8 tpyrcne_ot_atad_emos decrypted
10 Process-8 some_data_to_encrypt pool_func exit
10 MainProcess some_data_to_encrypt done
6 Process-6 tpyrcne_ot_atad_emos decrypted
6 Process-6 some_data_to_encrypt pool_func exit
6 MainProcess some_data_to_encrypt done
8 Process-7 tpyrcne_ot_atad_emos encrypted
8 Process-7 decrypt
12 Process-12 tpyrcne_ot_atad_emos encrypted
12 Process-12 decrypt
2 Process-5 tpyrcne_ot_atad_emos encrypted
2 Process-5 decrypt
15 Process-13 tpyrcne_ot_atad_emos decrypted
15 Process-13 some_data_to_encrypt pool_func exit
15 MainProcess some_data_to_encrypt done
11 Process-11 tpyrcne_ot_atad_emos decrypted
11 Process-11 some_data_to_encrypt pool_func exit
11 MainProcess some_data_to_encrypt done
13 Process-14 tpyrcne_ot_atad_emos encrypted
13 Process-14 decrypt
4 Process-2 tpyrcne_ot_atad_emos decrypted
4 Process-2 some_data_to_encrypt pool_func exit
4 MainProcess some_data_to_encrypt done
7 Process-9 tpyrcne_ot_atad_emos encrypted
7 Process-9 decrypt
0 Process-1 tpyrcne_ot_atad_emos encrypted
0 Process-1 decrypt
14 Process-16 tpyrcne_ot_atad_emos encrypted
14 Process-16 decrypt
8 Process-7 tpyrcne_ot_atad_emos decrypted
8 Process-7 some_data_to_encrypt pool_func exit
8 MainProcess some_data_to_encrypt done
5 Process-4 tpyrcne_ot_atad_emos decrypted
5 Process-4 some_data_to_encrypt pool_func exit
5 MainProcess some_data_to_encrypt done
3 Process-15 tpyrcne_ot_atad_emos decrypted
3 Process-15 some_data_to_encrypt pool_func exit
3 MainProcess some_data_to_encrypt done
9 Process-10 tpyrcne_ot_atad_emos decrypted
9 Process-10 some_data_to_encrypt pool_func exit
9 MainProcess some_data_to_encrypt done
13 Process-14 tpyrcne_ot_atad_emos decrypted
13 Process-14 some_data_to_encrypt pool_func exit
13 MainProcess some_data_to_encrypt done
2 Process-5 tpyrcne_ot_atad_emos decrypted
2 Process-5 some_data_to_encrypt pool_func exit
2 MainProcess some_data_to_encrypt done
0 Process-1 tpyrcne_ot_atad_emos decrypted
0 Process-1 some_data_to_encrypt pool_func exit
0 MainProcess some_data_to_encrypt done
12 Process-12 tpyrcne_ot_atad_emos decrypted
12 Process-12 some_data_to_encrypt pool_func exit
12 MainProcess some_data_to_encrypt done
14 Process-16 tpyrcne_ot_atad_emos decrypted
14 Process-16 some_data_to_encrypt pool_func exit
14 MainProcess some_data_to_encrypt done
7 Process-9 tpyrcne_ot_atad_emos decrypted
7 Process-9 some_data_to_encrypt pool_func exit
7 MainProcess some_data_to_encrypt done

Also, i suggest you to look into celery library. Maybe it will fits your requirements better

Yaroslav Surzhikov
  • 1,568
  • 1
  • 11
  • 16
  • Please, if you're able, check my recent comment in the main question and see if your answer is still relevant. – Ryan Spicer Sep 20 '17 at 21:18
  • Also, please bare in mind (I should've mentioned this) that this function is happening within an API call - I'm worried that build-up of the new processes would slow things down significantly with this method, given that consideration. But I could absolutely be wrong. – Ryan Spicer Sep 20 '17 at 21:21
  • @RyanSpicer i updated my answer, sorry for taking so long. As for spawning new processes - well, i think that you should try – Yaroslav Surzhikov Sep 20 '17 at 23:21
  • Understood. Great writeup, thanks... one problem, however... when I use a ProcessPool (instead of a ThreadPool), it errors a LOT, specifically (off the top of my head) about pickling. Does this mean the function I'm trying to run simply isn't compatible with mutliprocessing? – Ryan Spicer Sep 21 '17 at 01:31
  • @RyanSpicer i think errors are raised because multiproccessing tries to serialize socket object. If you use class instance, for example, which has some connection attribute and you try to spawn worker from one of the instance methods - you will get error. – Yaroslav Surzhikov Sep 21 '17 at 02:26