6

I am creating a dictionary as follows:

y=[(1,2),(2,3),(1,2),(5,6)]

dict={}

for tup in y:
    tup=tuple(sorted(tup))
    if tup in dict.keys():
        dict[tup]=dict[tup]+1
    else:
        dict[tup]=1

However my actual y contains about 40 million tuples, is there a way to use the multiprocessing to speed up this process?

Thanks

laila
  • 1,009
  • 3
  • 15
  • 27

5 Answers5

3

If you want to get the counts ignoring order, use a frozenset with Counter:

from collections import Counter

print(Counter(map(frozenset, y)))

Using the tuples from another answer:

In [9]: len(tuples)
Out[9]: 500000

In [10]: timeit Counter(map(frozenset, tuples))
1 loops, best of 3: 582 ms per loop

Using a frozenset will mean (1, 2) and (2,1) will be considered the same:

In [12]: y = [(1, 2), (2, 3), (1, 2), (5, 6),(2, 1),(6,5)]

In [13]: from collections import Counter

In [14]: 

In [14]: print(Counter(map(frozenset, y)))
Counter({frozenset({1, 2}): 3, frozenset({5, 6}): 2, frozenset({2, 3}): 1})

If you apply the same logic using multiprocessing, it will obviously be considerably faster, even without it beats what has been provided using multiprocessing.

Padraic Cunningham
  • 176,452
  • 29
  • 245
  • 321
  • 1
    This should be the accepted answer, @mrucci version does not works with unordered tuples. And mine just shows the "how" without extenting a better approach. – Cyrbil Dec 11 '15 at 14:52
  • True, I didn't notice the requirement on unordered tuples. But the question asked about "a way to use the multiprocessing" and nobody bothered to provide actual examples on how to parallelize what he wanted to do. – mrucci Dec 11 '15 at 18:52
  • thanks i used this answer, as I wanted a speed up of the implementation thanks – laila Dec 15 '15 at 14:12
2

You can follow a MapReduce approach.

from collections import Counter
from multiprocessing import Pool

NUM_PROCESSES = 8

y = [(1,2),(2,3),(1,2),(5,6)] * 10

## http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python
def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

## map
partial_counters = Pool(NUM_PROCESSES).map(Counter, chunks(y, NUM_PROCESSES))

## reduce
reduced_counter = reduce(Counter.__add__, partial_counters)

## Result is:
## Counter({(1, 2): 20, (5, 6): 10, (2, 3): 10})

The idea is:

  1. split your input list into chunks
  2. feed each chunk to a separate process that will independently compute the counts
  3. merged together all partial counts via a reduction operation.

EDIT: use chunks(map(frozenset, y), NUM_PROCESSES) to account for unordered pairs.

mrucci
  • 4,342
  • 3
  • 33
  • 35
0

First of all, instead of checking the membership of tup in dict.keys in each iteration which is a really bad idea, you can use collections.defaultdict() for this aim which is more pythonic:

from collections import defaultdict
test_dict = defaultdict(lambda:1)

for tup in y:
    tup=tuple(sorted(tup))
    test_dict[tup]=+1

Secondly if you want to use concurrency you might want to use multithreading or multiprocessing, but about multithreading, due to GIL multiple thread are not able to execute one bytecode at once, and you can not traverse your tuples from both side like an BDS algorithm.

But for multi processing you'll have another problem which is accessing to one shared memory from each core and working on it at once for more info read this answer https://stackoverflow.com/a/30132673/2867928.

So now what would be the trick?

One way is dividing your list to small pieces and use multithreading to apply your function on a specified part.

Another way is using coroutins and subroutins which as mentioned in that answer Greg Ewing has a great demonstration of how to use yield from to use coroutines to build things like schedulers or many-actor simulations.

Community
  • 1
  • 1
Mazdak
  • 105,000
  • 18
  • 159
  • 188
0

Edit: Answer edited to be threadsafe

The multiprocessing module make it easy.

Simply refactor your code to have the processing done in a function:

def process_tuple(tuples):
    count_dict = {}
    for tuple_ in tuples:
        tuple_=tuple(sorted(tuple_))
        if tuple_ in count_dict:
            count_dict[tuple_] += 1
        else:
            count_dict[tuple_] = 1
    return count_dict

Split tuples list into small group then use map to process all your groups.

## http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python
def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

# cut tuples list into 5 chunks
tuples_groups = chunks(tuples, 5)
pool = Pool(5)
count_dict = {}
# processes chunks in parallel
results = pool.map(process_tuple, tuples_groups)
# collect results
for result in results:
    count_dict.update(result)

multiprocessing.Pool will handle the distribution between threads.

Full example + benchmarks:

import time
import random

start_time = time.time()
tuples = []
x,y = (100000, 10)
for i in range(x):
    tuple_ = []
    for j in range(y):
        tuple_.append(random.randint(0, 9))
    tuples.append(tuple(tuple_))

print("--- %s data generated in %s seconds ---" % (x*y, time.time() - start_time))



def process_tuple(tuples):
    count_dict = {}
    for tuple_ in tuples:
        tuple_=tuple(sorted(tuple_))
        if tuple_ in count_dict:
            count_dict[tuple_] += 1
        else:
            count_dict[tuple_] = 1
    return count_dict

from multiprocessing import Pool

start_time = time.time()

## http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python
def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

# cut tuples list into 5 chunks
tuples_groups = chunks(tuples, 5)
pool = Pool(5)
count_dict = {}
# processes chunks in parallel
results = pool.map(process_tuple, tuples_groups)
# collect results
for result in results:
    count_dict.update(result)

print("--- Multithread processed in %s seconds ---" % (time.time() - start_time))    



start_time = time.time()
count_dict = {}
for tuple_ in tuples:
    tuple_=tuple(sorted(tuple_))
    if tuple_ in count_dict:
        count_dict[tuple_] += 1
    else:
        count_dict[tuple_] = 1

print("--- Single thread processed in %s seconds ---" % (time.time() - start_time))

--- 10000000 data generated in 32.7803010941 seconds ---
--- Multithread processed in 1.79116892815 seconds ---
--- Single thread processed in 2.65010404587 seconds ---

Cyrbil
  • 6,341
  • 1
  • 24
  • 40
0

Because you want to increment counts (not simply create new key/value pairs), dictionary is not thread-safe to use unless you acquire a semaphore around each update and release it after - so I don't think you will get any overall speed gain, in fact it may be slower.

If you are going to thread this it is probably best for each thread to update its own dictionary and then merge the results as each thread finishes, that way there is no doubt about thread-safeness. However because it is likely to be CPU-bound you should use multiprocessing rather than threads - multiprocessing can make use of all your CPU cores.

Also, if you use a collections.Counter, it will do the counting for you, and supports merging, and has useful method most_common(n) to return the keys with the n highest counts.