1

I'm trying to parallelize the subsetting of a Python dictionary. The code below creates a new dictionary, positions_sub, based on if the keys in positions dictionary are found in a list, node_list:

positions_sub = {}
for k,v in positions.items():
    if k in node_list:
        positions_sub[k] = v

This code works just fine and does exactly what I want. However, it takes a while to run so I'm trying to parallelize it. I was trying to do this in the code below, but it returns positions_sub as a list of dictionaries, which isn't what I want. There are also some issues with the number of values per key. Any ideas how to get this working? Thanks!

from joblib import Parallel, delayed

def dict_filter(k,v):
    if k in node_list:
        positions_sub[k] = v
    return positions_sub
positions_sub = Parallel(n_jobs=-1,)(delayed(dict_filter)(k,v)for k,v in positions.items())
natnay
  • 460
  • 1
  • 5
  • 24
  • **How big is the `positions_sub`** dictionary and how large memory does the **python** process actually have allocated ( occupy ) at the moment of the call to the **`Parallel()(delayed(...)...)`**- both state in **`[GB]`**, ok? – user3666197 Jul 09 '19 at 17:47
  • @user3666197 This code is part of a bigger loop. Each iteration of `positions_sub` is around 300k and `positions` is approximately 1.2M entries. I'm using a server with 72GB memory – natnay Jul 09 '19 at 18:25

2 Answers2

2

Before you resort to parallelization you should make sure you are using the right data structure for each task: Remember that x in list is essentially O(n) whereas x in set (and also x in dict) is more like O(1). Therefore just converting your node_list to a set can improve the performance tremendously.

node_list = set(node_list)
positions_sub = {}
for k,v in positions.items():
    if k in node_list:
        positions_sub[k] = v

An other thing to consider is the ratio between len(positions) and len(node_list). If one is substantially smaller than the other you should always iterate over the smaller one.


EDIT: some code for performance comparisons

import random
import timeit
import functools

def generate(n_positions=1000, n_node_list=100):
    positions = { i:i for i in random.sample(range(n_positions), n_positions) }
    node_list = random.sample(range(max(n_positions, n_node_list)), n_node_list)
    return positions, node_list  

def validate(variant):
    data = generate(1000, 100)
    if sorted(data[1]) != sorted(k for k in variant(*data)):
        raise Exception(f"{variant.__name__} failed")

def measure(variant, data, repeats=1000):
    total_seconds = timeit.Timer(functools.partial(variant, *data)).timeit(repeats)
    average_ms = total_seconds / repeats * 1000
    print(f"{variant.__name__:10s} took an average of {average_ms:0.2f}ms per pass over {repeats} passes" )   


def variant1(positions, node_list):
    positions_sub = {}
    for k,v in positions.items():
        if k in node_list:
            positions_sub[k] = v
    return positions_sub

def variant1b(positions, node_list):
    node_list = set(node_list)
    positions_sub = {}
    for k,v in positions.items():
        if k in node_list:
            positions_sub[k] = v
    return positions_sub

def variant2(positions, node_list):
    return {k:v for k,v in positions.items() if k in node_list}

def variant2b(positions, node_list):
    node_list = set(node_list)
    return {k:v for k,v in positions.items() if k in node_list}

def variant3(positions, node_list):
    return {k:positions[k] for k in node_list if k in positions}



if __name__ == "__main__":
    variants = [variant1,variant1b,variant2,variant2b,variant3]
    for variant in variants:
        validate(variant)      

    n_positions = 4000
    n_node_list = 1000
    n_repeats = 100
    data = generate(n_node_list, n_node_list)
    print(f"data generated with len(positions)={n_positions} and len(node_list)={n_node_list}")
    for variant in variants:
        measure(variant, data, n_repeats)

EDIT2: as requested, here some results on my machine

first run:
data generated with len(positions)=4000 and len(node_list)=1000
variant1   took an average of 6.90ms per pass over 100 passes
variant1b  took an average of 0.22ms per pass over 100 passes
variant2   took an average of 6.95ms per pass over 100 passes
variant2b  took an average of 0.12ms per pass over 100 passes
variant3   took an average of 0.19ms per pass over 100 passes

second run:
data generated with len(positions)=40000 and len(node_list)=10000
variant1   took an average of 738.23ms per pass over 10 passes
variant1b  took an average of   2.04ms per pass over 10 passes
variant2   took an average of 739.51ms per pass over 10 passes
variant2b  took an average of   1.52ms per pass over 10 passes
variant3   took an average of   1.85ms per pass over 10 passes

Note that n=len(positions) and m=len(node_list) have been selected such that the ratio n/m=4 is roughly equivalent to that of the original data which has been specified by OP as 1.2M for n and 300K for m.

Observe the effect of scaling up by a factor of 10 from the first to the second run: Where in the first run variant1b is about 31 times faster than variant1, in the second run it is 361 times faster! This is the expected result of reducing the complexity of the k in node_list from O(m) to O(1). The total time complexity of variant1 is n*m = 0.25*n^2 = O(n^2) whereas variant1b has only n*1 = O(n). This means that for every order of magnitude that n increases, variant1b is also an order of magnitude faster than variant1.

That a similar performance improvement can be achieved by parallelization alone is rather doubtful, as by and large the expected performance gain of an embarrassingly parallelizable problem is a multiple of the available CPUs, which is still a constant factor and nowhere near the gain of improving the algorithm from O(n^2) to O(n).

Also, while IMHO the given problem falls into the class of embarrassingly parallelizable problems, the output must be aggregated after the parallel processing before it can be used. Furthermore I'm quite unfamiliar with joblib which is why I have skipped adding it to the comparison.

user3666197
  • 1
  • 6
  • 50
  • 92
PeterE
  • 5,715
  • 5
  • 29
  • 51
  • Thanks for your comment and the detailed response! I didn't know about using a set vs list for performance gains. I have tested this and it is definitely faster. Also thank you for providing the comparison functions, good to see the gains. – natnay Jul 09 '19 at 19:35
  • @PeterE Could you, kindly, **a)** post here also your **`localhost`** generated processing outputs for enabling a coherent comparison of the proposed solution (a consistent reference) and **b)** comment, why you have excluded the wish to let the processing run in parallel pool of processes, as directed by `Parallel(n_jobs=-1,)(delayed(...)...)` parallel-execution constructor ( or add such approach to the post and benchmark outputs ). While a different iterator O(1) / O(n) scaling is of a value, you avoided `n_jobs` scaling. **That is fair to solve the original O/P defined problem, isn't it?** – user3666197 Jul 09 '19 at 19:58
  • @user3666197 I think you are missing the point. Going from O(n) to O(1) is vastly more effective than any parallelization. To be fair, adding a parallelized variant would be interesting. But I am unfamiliar with joblib, so I skipped that. – PeterE Jul 09 '19 at 21:11
  • @PeterE Thx for completed results. For testing I normally use a [us]-timing, with recording the whole cohort of results ( noisy effects, potentially dangerous for real-world code-execution in production, are not masked by the averaging ) and avoid posting just average, but rather a bit more complete picture of the code-execution from [ min, mean, max ] ( for a simple benchmarking template / source-code example may look into https://stackoverflow.com/a/56915941 for details ). All **tests start to have sense once evicted from cache-artifacts above 1E9+ scales. We compare apples to apples there** – user3666197 Jul 09 '19 at 21:45
  • @user3666197 Yeah, your are not wrong that my measurements are not particularly precise nor comprehensive, on the other hand this is a fairly simple problem/algorithm so there should not be much variance. (Which is kind of a point in your favor, as I can't prove it with just the numbers I have produced so far :-) – PeterE Jul 09 '19 at 21:57
  • @PeterE partly agreed. The core message was the scale. All 10k or 40k-item-scaled objects sit inside the L3-cache at worst, so the 2nd+ tests benefit from having all the data "nearer", than the first run, which is never fair. Having 1E9+ items in the scaled enough objects, the tests start to have meaning, as neither of the test-runs has an unfair dis-advantage to pay the full-fetch costs ( and leaves the data in L3/L2/L1-cache for the good of all its test-"competitors". That is not fair, is it :o) Best regards + thanks for posting the results for your proposed approaches. – user3666197 Jul 09 '19 at 22:04
0

You can use asyncio. (Documentation can be found [here][1]). It is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. Plus it has both high-level and low-level APIs to accomodate any kind of problem.

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Now this function will be run in parallel whenever called without putting main program into wait state. You can use it to parallelize for loop as well. When called for a for loop, though loop is sequential but every iteration runs in parallel to the main program as soon as interpreter gets there.

For your specific case you can do:

import asyncio
import time


def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)                                   
    return wrapped
    
@background
def add_to_dictionary(k,v):
    time.sleep(1) # Added Sleep to better demonstrate parallelization
    print(f"function called for {k=}\n", end='')
    if k in node_list:
        positions_sub[k] = v

# Random data to demonstrate parallelization
positions = {i:i for i in range(20)}
node_list = [key for key in positions if not key%3 or not key%5]
print(f"{positions=}, {node_list=}")

positions_sub = dict()

loop = asyncio.get_event_loop() # Have a new event loop

looper = asyncio.gather(*[add_to_dictionary(k,v) for k, v in positions.items()])  
# Run the loop
                             
results = loop.run_until_complete(looper) # Wait until finish


print('loop finished')
print(f"{positions_sub=}")

This produces following output:

positions={0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9, 10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 17: 17, 18: 18, 19: 19},
node_list=[0, 3, 5, 6, 9, 10, 12, 15, 18]
function called for k=0
function called for k=6
function called for k=5
function called for k=4
function called for k=2
function called for k=1
function called for k=3
function called for k=7
function called for k=11
function called for k=10
function called for k=8
function called for k=15
function called for k=14
function called for k=12
function called for k=9
function called for k=13
function called for k=19
function called for k=18
function called for k=17
function called for k=16
loop finished
positions_sub={3: 3, 6: 6, 5: 5, 0: 0, 10: 10, 15: 15, 9: 9, 12: 12, 18: 18}
Hamza
  • 5,373
  • 3
  • 28
  • 43