1

I have a question here regarding doing calculations on a python dictionary----in this case, the dictionary has millions of keys, and the lists are similarly long. There seems to be disagreement whether one could use parallelization here, so I'll ask the question here more explicitly. Here is the original question:

Optimizing parsing of massive python dictionary, multi-threading

This is a toy (small) python dictionary:

example_dict1 = {'key1':[367, 30, 847, 482, 887, 654, 347, 504, 413, 821],
    'key2':[754, 915, 622, 149, 279, 192, 312, 203, 742, 846], 
    'key3':[586, 521, 470, 476, 693, 426, 746, 733, 528, 565]}

Let's say I need to parse the values of the lists, which I've implemented into the following simple (toy) function:

def manipulate_values(input_list):
    return_values = []
    for i in input_list:
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return return_values

Now, I can easily parse the values of this dictionary as follows:

for key, value in example_dict1.items():
    example_dict1[key] = manipulate_values(value)

resulting in the following:

example_dict1 = {'key1': [134676, 887, 717396, 232311, 786756, 427703, 120396, 254003, 170556, 674028], 
     'key2': [568503, 837212, 386871, 22188, 77828, 36851, 97331, 41196, 550551, 715703], 
     'key3': [343383, 271428, 220887, 226563, 480236, 181463, 556503, 537276, 278771, 319212]}

Question: Why couldn't I use multiple threads to do this calculation, e.g. three threads, one for key1, key2, and key3? Would concurrent.futures.ProcessPoolExecutor() work here?

Original question: Are there better ways to optimize this take to be quick?

EB2127
  • 1,788
  • 3
  • 22
  • 43
  • I'd do the following: 1) create a function that `returns example_dict1[key]= manipulate_values(example_dict1[key]))` and then use `multiprocessing.Pool.map` to map that function on `example_dict1.keys()`. – gnahum Mar 09 '20 at 06:19
  • @gnahum Thanks---do you know of the difference between this approach and using ProcessPoolExcecutor()? Take a look at this answer here: https://stackoverflow.com/questions/30060088/python-how-to-parallelize-a-loop-with-dictionary – EB2127 Mar 09 '20 at 06:32
  • 1
    The `ProcessPoolExecutor()` seems to be syntactic sugar around `multiprocessing.Pool()`. – moooeeeep Mar 09 '20 at 09:18

2 Answers2

2

python threads will not really help you to process in parallel since they are executed on the same one "real CPU thread", python threads are helpful when you deal with asynchronous HTTP calls

AboutProcessPoolExecutor from the docs:

concurrent.futures.ProcessPoolExecutor()

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

it can help you if you need high CPU processing, you can use:

import concurrent


def manipulate_values(k_v):
    k, v = k_v
    return_values = []
    for i in v :
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return k, return_values


with concurrent.futures.ProcessPoolExecutor() as executor:
        example_dict = dict(executor.map(manipulate_values, example_dict1.items()))

here is a simple benchmark, using a simple for loop to process your data versus using ProcessPoolExecutor, my scenario assume that for each item to be processed you need ~50ms CPU time:

enter image description here

you can see the real benefit from ProcessPoolExecutor if the CPU time per item to be processed is high

from simple_benchmark import BenchmarkBuilder
import time
import concurrent

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    time.sleep(0.05)
    return k, v

def manipulate_values2(v):
    time.sleep(0.05)
    return v

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2((key, value))


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 10):
        size = 2**exp
        yield size, {i: [i] * 10_000 for i in range(size)}

r = b.run()
r.plot()

if you do not set the number of workers for ProcessPoolExecutor the default number of workers will be equal with the number of processors on your machine (for the benchmark I used a pc with 8 CPU).


but in your case, with the data provided in your question, to process 1 item will take ~3 µs:

%timeit manipulate_values([367, 30, 847, 482, 887, 654, 347, 504, 413, 821])
2.32 µs ± 25.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

in which case the benchmark will look: enter image description here

So it is better to use a simple for loop if the CPU time for one item to be processed is low.


a good point raised by @user3666197 is the case when you have huge items/lists, I benchmarked both approaches using 1_000_000_000 random numbers in a list:

enter image description here

as you can see in this case is more suitable to use ProcessPoolExecutor

from simple_benchmark import BenchmarkBuilder
import time
import concurrent
from random import choice

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    return_values = []
    for i in v:
        new_value = i ** 2 - 13
        return_values.append(new_value)

    return k, return_values

def manipulate_values2(v):
    return_values = []
    for i in v:
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return return_values

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2(value)


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 5):
        size = 2**exp
        yield size, {i: [choice(range(1000)) for _ in range(1_000_000)] for i in range(size)}

r = b.run()
r.plot()

expected since to process one item it takes ~209ms:

l = [367] * 1_000_000
%timeit manipulate_values2(l)
# 209 ms ± 1.45 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

still, the fastest option will be to use numpy.arrays with the for loop solution:

enter image description here

from simple_benchmark import BenchmarkBuilder
import time
import concurrent
import numpy as np

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    return k,  v ** 2 - 13

def manipulate_values2(v):
    return v ** 2 - 13

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2(value)


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 7):
        size = 2**exp
        yield size, {i: np.random.randint(0, 1000, size=1_000_000) for i in range(size)}

r = b.run()
r.plot()

it is expected that the simple for loop to be faster since to process one numpy.array takes < 1ms:

def manipulate_value2( input_list ):
    return input_list ** 2 - 13

l = np.random.randint(0, 1000, size=1_000_000)
%timeit manipulate_values2(l)
# 951 µs ± 5.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
user3666197
  • 1
  • 6
  • 50
  • 92
kederrac
  • 16,819
  • 6
  • 32
  • 55
  • Thank you for such a comprehensive answer! This is great. My question is, I understand your second test, but I do not understand the phrase "CPU time per time". What do you mean? – EB2127 Mar 09 '20 at 11:25
  • CPU time per item? how much CPU time it is necessary to process one item, in your case: how much CPU time it is necessary to process one list – kederrac Mar 09 '20 at 11:26
  • @EB2127 I changed my last sentence, I hope is better – kederrac Mar 09 '20 at 11:29
  • I think I understand now. "CPU time per item" is the CPU time necessary to compute one item---if this is high, then using multiple processors will have benefits. If CPU time per item is low, multiple processors will not be beneficial. Right? – EB2127 Mar 09 '20 at 11:32
  • Thanks for this answer---I learned a lot. Also, there might be something better than `concurrent.futures.ProcessPoolExecutor()`---I just found this. It's possible there are other ways to use multi-processing in Python which side-steps the GIL. There could be a better option here....I don't know. – EB2127 Mar 09 '20 at 11:36
  • @EB2127 you could use [multiprocessing](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing) but the answer will be the same since `concurrent.futures.ProcessPoolExecutor()` it is using the `multiprocessing` module – kederrac Mar 09 '20 at 11:38
  • @EB2127 short answer, there is no other option using python built-in tooling – kederrac Mar 09 '20 at 11:40
  • 1
    Thanks. Yes, I was curious whether there was another `multiprocessing` tool available, given arguments below about an overhead from serialization/de-serialization....but I have tried this out. Thank you for the help! This was really useful – EB2127 Mar 09 '20 at 11:44
  • @kederrac would you mind to complete the micro-benchmarks **for objects, that are indeed congruent with the O/P announced sizing** - i.e. ~ millions of times repeated **list-processing**, each one passed to the spawned process, **having millions of list-items** ( causing also millions of remote list-appends etc ) -- these numbers matter, as the scaling-dependent processing overheads dominate over a trivial, lightweight work-unit ( for which an indeed vectorised, numpy-efficient block-operations would not have any pythonic competition, would they? ) – user3666197 Mar 09 '20 at 12:20
  • so you suggest to benchmark against lists with millions of items? – kederrac Mar 09 '20 at 12:34
  • @kederrac Sure, **first :** the O/P will work on that scale + **second :** the benchmarks in my answer below did conform to that sizing --- **It is both fair & serious to compare apples to apples, isn't it?** – user3666197 Mar 09 '20 at 12:45
  • @user3666197 now it is better? – kederrac Mar 09 '20 at 14:23
  • Did I read correctly the meta-language of the test-decorators, that the graphs scale in keys-in-dicts, yet do not scale to O/P defined list-sizes, being well above 1E6+, so that the SER/DES add-on overhead costs get indeed visible in real scales? **The growth of adverse effects of the list-size driven dependencies is the key in the story :o) so why not depicting that?** The numpy-efficiency dominance will the more get sense in contrast to those characteristics, will that not? :o) – user3666197 Mar 10 '20 at 06:56
  • @kederrac This answer was very useful---I am running into some memory problems, so I'm trying to do the same with generators. You may be interested https://stackoverflow.com/questions/60798407/multiprocessing-with-a-generator-functions-typeerror-cannot-pickle-generator – EB2127 Mar 22 '20 at 11:23
1

Q : "Why couldn't I use multiple threads to do this calculation, e.g. three threads, one for key1, key2, and key3?"

You could, yet for no reasonable effect on performance - knowing all details about how python handles the thread-based flow of execution is cardinal here. Learn about the GIL-lock trick, used right for it avoiding any concurrent processing and its effects on performance you get the WHY-part.

Q : "Would concurrent.futures.ProcessPoolExecutor() work here?"

Would.

Yet the net-effect thereof ( if any "faster" than a pure-[SERIAL] flow of processing ) will depend on a given size of the "large"-lists (as warned to be (cit.)"millions of keys, and the lists are similarly long." above) that ought get copied ( RAM-I/O ) and passed ( SER/DES-processed + IPC-transferred ) to the pool of spawned ( process-based ) remote executors.

These many times repeated RAM-I/O + SER/DES add-on overhead costs will soon dominate.

A RAM-I/O copy step:

>>> from zmq import Stopwatch; aClk = Stopwatch()

>>> aClk.start(); aList = [ i for i in range( int( 1E4 ) ) ]; aClk.stop()
   1345 [us] to copy a List of 1E4 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E5 ) ) ]; aClk.stop()
  12776 [us] to copy a List of 1E5 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E6 ) ) ]; aClk.stop()
 149197 [us] to copy a List of 1E6 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E7 ) ) ]; aClk.stop()
1253792 [us] to copy a List of 1E7 elements
|  |::: [us]
|  +--- [ms]
+------ [ s]

SER/DES step :

>>> import pickle
>>> aClk.start(); _ = pickle.dumps( aList ); aClk.stop()
 608323 
 615851
 638821 [us] to copy pickle.dumps() a List of 1E7 elements
|  |::: [us]
|  +--- [ms]
+------ [ s]

So the expected, per batch add-on overhead is ~ 2 x ( 1253 + 608 ) [ms] + IPC-transfer costs for just a one shot of 1E7-items

The actual useful-work payload of manipulate_values() is so small, that the lump sum of all the add-on costs would hardly cover the added expenses, associated with distributing the work-units across the pool-of-remote workers. Much smarter results are to be expected from vectorised forms of computing. The add-on costs here are awfully larger than the small amount of useful-work.

The more the schema will depend on the overhead costs of SER/DES parameters passing "there" plus the add-on costs of the SER/DES on results being returned "back" - all of which altogether will decide on the net-effect ( anti-speedups << 1.0 x are quite often observed on use-cases, introduced with but a poor design-side engineering practices, no late benchmarks can salvage the already burnt man*days, wasted in such poor design decision )

user3666197
  • 1
  • 6
  • 50
  • 92
  • `ProcessPoolExecutor` uses processes, not threads. Thus no GIL contention there. However, there is additional serialization and deserialization going on... – moooeeeep Mar 09 '20 at 09:08
  • Mea Culpa - yes, given the size of lists passed to the pool of ( process-based ) remote executors, the SER/DES add-on overhead costs will dominate. The more the schema will depend on the overhead costs of SER/DES parameters passing *"there"* plus the add-on costs of the SER/DES on results being returned *"back"* - all of which altogether will decide on the net-effect ( ***anti*-speedups `<< 1.0 x`** are quite often observed on use-cases, introduced with but a poor design-side engineering practices, no late benchmarks can salvage the already burnt man*days, wasted in such poor design decision ) – user3666197 Mar 09 '20 at 09:39