4

I'm not sure if this is possible(hoping so). I have a dataset that I run through a process that uses defaultdict. DefaultDict has a feature where if you search something and its not in the dictionary it gets added(in my case I'm searching for values they are getting added then I'm searching later on for those values and if they are in the dict then I set the value to True from the default false). Works pretty easily with no problems but I start getting incorrect results once I try to multiprocess this(The real data/process is pretty large and plus I have multicore hardware so why not use it, right?). Here's my results(the size of the table with multiprocessing seems to always be changing, sometimes its the same without multiprocess but often its slightly less.):

size of Table(with multiprocesing) is: 398
total number of true(with multiprocesing) is  0
size of Table(without multiprocesing) is  402
total number of true(without multiprocessing) is  250

Anyways, Here's some functional code. At the top is the multiprocessed code and at the bottom is without multiprocessed(I figured out how to get the defaultdict to be shared with all the new proceses but still doesn't work):

from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy
from collections import defaultdict

class MyManager(BaseManager):
    pass

MyManager.register('defaultdict', defaultdict, DictProxy)

def test(i,x, T):
    target_sum = 100
    # T[x, i] is True if 'x' can be solved
    # by a linear combination of data[:i+1]
    #T = defaultdict(bool)           # all values are False by default
    T[0, 0] = True                # base case

    for s in range(target_sum + 1): #set the range of one higher than sum to include sum itself
            #print s
            for c in range(s / x + 1):  
                if T[s - c * x, i]:
                    T[s, i + 1] = True


data = [2,5,8]                
pool = Pool(processes=2)
mgr = MyManager()
mgr.start()
T = mgr.defaultdict(bool)
T[0, 0] = True 
for i, x in enumerate(data):    # i is index, x is data[i]
    pool.apply_async(test, (i,x, T))
pool.close()
pool.join()
pool.terminate()


print 'size of Table(with multiprocesing) is:', len(T)
count_of_true = []
for x in T.items():
    if T[x] == True:
       count_of_true.append(x)
print 'total number of true(with multiprocesing) is ', len(count_of_true)


#now lets try without multiprocessing
target_sum = 100
# T[x, i] is True if 'x' can be solved
# by a linear combination of data[:i+1]
T1 = defaultdict(bool)           # all values are False by default
T1[0, 0] = True                # base case


for i, x in enumerate(data):    # i is index, x is data[i]
    for s in range(target_sum + 1): #set the range of one higher than sum to include sum itself
            for c in range(s / x + 1):  
                if T1[s - c * x, i]:
                    T1[s, i + 1] = True

print 'size of Table(without multiprocesing) is ', len(T1)

count = []
for x in T1:
    if T1[x] == True:
        count.append(x)

print 'total number of true(without multiprocessing) is ', len(count)

I hope there's a solution for this. I tried for the past 2 weeks to run this to a database but its too slow with very very large datasets. The above processes handles everything in memory(but still takes a few hours to run on my test data which is why I want to use multicores on it) .

Lostsoul
  • 25,013
  • 48
  • 144
  • 239

3 Answers3

3

The behavior of defaultdict is easy to replicate using a standard dict. In this case, it looks to me like you could simply replace this line in test:

if T[s - c * x, i]:

with this line:

if T.get((s - c * x, i), False):

See if you can get this code working with a standard dictionary before bothering to customize a Manager object.

But actually it seems that each value of i will store values that may be accessed by the loop that handles i + 1. This means that the results for each loop depend on the previous loop, and so an asynchronous approach may produce errors.

To expand on this, try this code:

from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy, ListProxy
from collections import defaultdict

class MyManager(BaseManager):
    pass

MyManager.register('defaultdict', defaultdict, DictProxy)
MyManager.register('list', list, ListProxy)

def test(i,x,T, order):
    target_sum = 100
    # T[x, i] is True if 'x' can be solved
    # by a linear combination of data[:i+1]
    #T = defaultdict(bool)          # all values are False by default
    T[0, 0] = True                  # base case
    for s in range(target_sum + 1): # set the range of one higher 
                                    # than sum to include sum itself
        for c in range(s / x + 1):
            if T[s - c * x, i]:
                T[s, i + 1] = True
                order.append(i)

def setup():
    mgr = MyManager()
    mgr.start()
    run_order = mgr.list()
    T = mgr.defaultdict(bool)
    T[0, 0] = True
    data = [2,5,8]
    return data, T, run_order

def async_loop(data, func, output, run_order, wait=False):
    pool = Pool(processes=6)
    for i, x in enumerate(data):    # i is index, x is data[i]
        p=pool.apply_async(func, (i, x, output, run_order))
        if wait:
            p.wait()
    pool.close()
    pool.join()
    pool.terminate()

def output_size(output, run_order, wait):
    wait = 'out' if wait else ''
    print 'size of Table (with{0} multiprocesing) is: {1}'.format(
        wait, len(output))
    count_of_true = []
    for (x, result) in output.items():
        if output[x] == True:
            count_of_true.append(x)
    print 'total number of true (with{0} multiprocesing) is: {1}'.format(
        wait, len(count_of_true))
    print 'run order is: {0}'.format(run_order)

data, table, run_order = setup()
async_loop(data, test, table, run_order, wait=True)
output_size(table, run_order, True)
data, table, run_order = setup()
async_loop(data, test, table, run_order, wait=False)
output_size(table, run_order, false)

The output is as follows:

size of Table (without multiprocesing) is: 402
total number of true (without multiprocesing) is: 250
run order is: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
size of Table (with multiprocesing) is: 402
total number of true (with multiprocesing) is: 250
run order is: [0, 0, 0, 1, 1, 0, 1, 1, 0, 2, 1, 2, 2, 1, 0, 2, 1, 2, 2, 1, 1, 2, 2, 1, 2, 1, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 0, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 0, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 0, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 0, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 0, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 0, 1, 2, 2, 1, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 2, 0, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 0, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 0, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 0, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 2, 1, 2, 0, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 0, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 0, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 0, 2, 1, 2, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 0, 2, 1, 2, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 2, 1, 0, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 0, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 0, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 0, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 0, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 0, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 2, 1, 2, 1, 2, 1, 2, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 0, 1, 2, 1, 2, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 2, 1, 2, 0, 1, 2, 2, 1, 1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 0, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 0, 1, 2, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 2, 1, 0, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2, 1, 2, 1, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 0, 2, 0, 0, 0, 0, 0, 0, 0]

So as you can see, the order of execution is sequential if you call p.wait() and is out-of-order if you don't. And because it's out of order, you'll notice that not all calculations for i = 0 are complete before the i = 1 and i = 2 calculations begin. That may sometimes mean that an i = 0 calculation writes to a key that i = 1 uses, but only after the i = 1 calculation has already read the key. (And indeed, although the table is the same length in the above example, you'll notice that the length of the order list is different. So something different is happening, even when it doesn't affect the final result.)

senderle
  • 145,869
  • 36
  • 209
  • 233
  • looks intresting. Let me play around with the code and get back to you. In the meantime, I did notice something strange, it seems with your way it only reports the True entries(which is what I need so its perfect) but sometimes it seems to be 250(which is correct) other times I notice it at 248 or 246. Any idea of what I can do to fix the inconsistency? – Lostsoul Feb 21 '12 at 04:24
  • @Lostsoul, one thing that occurs to me is that you might have to do `if T.get(x, True):` in your test loop. But that would cause a `KeyError` if `x` were not in the dict, so it's probably not the problem. Also, if this is the same behavior you're getting with `defaultdict` then it's probably a problem with the algorithm, not `defaultdict`. I'll see if I can identify the problem. – senderle Feb 21 '12 at 04:31
  • Thanks, I'll do some digging also to try to figure out what the inconsistency is(because both algos are the exact same). With that said, I must say your a genius. Your approach works fine, when it does work the lists are the same. Worst case, I can run the multiprocessed twice to make sure nothing was missed, but honestly it seems like a waste if there's a simple thing I can do to somehow make sure the results are consistent. – Lostsoul Feb 21 '12 at 04:40
  • fyi. I'm not sure how correct this is but if I launch a bunch of programs while this program is running the results get a bit inconsistent, which makes me believe it maybe the multiprocessing. – Lostsoul Feb 21 '12 at 04:40
  • @Lostsoul, well, I'm actually not sure they're identical. In the sequential (non-multiprocessing) version, it seems to me that the results of earlier loops do affect the results of later loops (because there's an `i + 1` in there). To me, that indicates that in the asynchronous version, if some processes complete in an out-of-order way, the results could differ in subtle and unpredictable ways. In short, this might not be a parallelizable algorithm. – senderle Feb 21 '12 at 04:47
  • I'll need to look into that more but I believe i is the index counter and all its doing is if its calculation exists it adds an entry for the future value. This should all actually be contained in the single loop. Also, it works most of the time its just sometimes its off. Update: on second thought you might be correct, as I add more logic into the test function, the results start to drift. – Lostsoul Feb 21 '12 at 04:54
  • its off by alot multi=52 and non-multi=542. I can get it to work by rerunning the multi again, but my program will be that I won't know how many times to run it to make sure my number is accurate. – Lostsoul Feb 21 '12 at 05:04
  • Very interesting, is this implementation still relevant now? I'm trying it out in python 3.7 and the registered defaultdict doesn't seem to update. – Aly Shmahell Apr 01 '19 at 01:24
  • 1
    @AlyShmahell it has been a long time since I last looked at this, but to the best of my recollection, my recommendation was *not* to use a defaultdict, and instead to "emulate" one using get and setdefault. That way you don't have to deal with this problem. The code in my answer doesn't attempt to fix the defaultdict problem. It just demonstrates the phenomenon of out-of-order execution. – senderle Apr 01 '19 at 01:44
  • @senderle thank you for the explanation, I think if all fails I will go about it by emulating the container I'm trying to register (which is collections.Counter as my ultimate goal). – Aly Shmahell Apr 01 '19 at 04:30
  • @senderle Would you know [how to use **nested** defaultdict with multiprocessing?](https://stackoverflow.com/questions/60685275/how-to-use-nested-defaultdict-with-multiprocessing) – Jones Mar 16 '20 at 20:12
1

I did a quick check, and I suspected, the child process is creating it's own version of T. You need to setup a global variable and let the manager update the variable.

I added this in the test function to see what id T was:

T[0, 0] = True                # base case
filename = "test."+str(i)
with open(filename,  "w" ) as f:
    f.write( "address of T %x\n" % id(T) )
f.close()

address of T 823f50 address of T 955550 address of T 955bd0

So when the children complete, the parent never gets the updates.

I'll play with it a bit to set up a global or process-shared dict.

Rich
  • 183
  • 5
  • In `multiprocessing` everything is copied. But the `id` of `T` doesn't matter, because it's just a proxy for the real dict, which is managed by the `Manager` object. In other words, `T` _is_ a process-shared dict. – senderle Feb 21 '12 at 06:19
  • correct, I see that T was actually global due to its indent position. – Rich Feb 21 '12 at 07:13
1

There were a few things I fixed

I removed T from the function calls, this was killing the process var you defined as manager.defaultdict(bool)

Edit: actually, I just realized, T was being global because there is no def main, I restored the T back to the function calls. sorry about that. :)

Edit 2: I also added the p.wait() after your sync. I think this may be where you were seeing the drops. I noticed the same drops, but adding p.wait looks to have stopped the drops from the children.

Edit 3: changed p.wait() to p.get(timeout=5)

You only need to pass the functional args, not the global vars. Also, in your loop, the result of T once it completes is:

T = defaultdict(, {(7, 3): True, (90, 0): False, ... etc })

So I changed the for loop to pick up the key,value.


size of Table(with multiprocesing) is: 402
total number of true(with multiprocesing) is  250
size of Table(without multiprocesing) is  402
total number of true(without multiprocessing) is  250
    from multiprocessing import Pool
    from multiprocessing.managers import BaseManager, DictProxy
    from collections import defaultdict

    class MyManager(BaseManager):
        pass

    MyManager.register('defaultdict', defaultdict, DictProxy)

    def test(i,x,T):
        target_sum = 100
        # T[x, i] is True if 'x' can be solved
        # by a linear combination of data[:i+1]
        #T = defaultdict(bool)           # all values are False by default
        T[0, 0] = True                # base case
        for s in range(target_sum + 1): #set the range of one higher than sum to include sum itself
            for c in range(s / x + 1):  
                if T[s - c * x, i]:
                    T[s, i + 1] = True



    mgr = MyManager()
    mgr.start()
    T = mgr.defaultdict(bool)

    T[0, 0] = True 
    data = [2,5,8]                
    pool = Pool(processes=2)

    for i, x in enumerate(data):    # i is index, x is data[i]
        p=pool.apply_async(test,(i,x,T))
        p.get(timeout=5)
    pool.close()
    pool.join()
    pool.terminate()

    print 'size of Table(with multiprocesing) is:', len(T)
    count_of_true = []
    for (x, result) in T.items():
        if T[x] == True:
           count_of_true.append(x)
    print 'total number of true(with multiprocesing) is ', len(count_of_true)

    #==========================
    #now lets try without multiprocessing
    target_sum = 100
    # T[x, i] is True if 'x' can be solved
    # by a linear combination of data[:i+1]
    T1 = defaultdict(bool)           # all values are False by default
    T1[0, 0] = True                # base case


    for i, x in enumerate(data):    # i is index, x is data[i]
        for s in range(target_sum + 1): #set the range of one higher than sum to include sum itself
                for c in range(s / x + 1):  
                    if T1[s - c * x, i]:
                        T1[s, i + 1] = True

    print 'size of Table(without multiprocesing) is ', len(T1)

    count = []
    for x in T1:
        if T1[x] == True:
            count.append(x)

    print 'total number of true(without multiprocessing) is ', len(count)

Rich
  • 183
  • 5
  • "I also added..." Please don't add comments on your own answer. Please **update** the answer to be complete. And then remove the useless comment. – S.Lott Feb 21 '12 at 10:48
  • @Rich, adding `p.wait()` _completely defeats the point of multiprocessing_. It ensures that each of the processes executes in sequence, rather than concurrently. Add a `print i` statement inside the inner loop of `test` to see what I mean. – senderle Feb 21 '12 at 15:26
  • @Lostsoul, I just want to draw your attention to the fact that adding `p.wait()` at the end of every loop as this suggests works because it prevents any multiprocessing from happening. So his answer will actually slow down your computation (because it isn't concurrent and adds the overhead of multiprocessing). – senderle Feb 21 '12 at 15:32
  • @senderle, I tried p.get(timeout=5). It seems to have same effect as p.wait(). – Rich Feb 21 '12 at 23:42
  • I tried capturing the processes and then run p.get() outside the loop. The results continue to fluctuate. I think the pool process is correct - the problem is the loop in the test function - it may be a timing issue when one thread checks a value and the other process didn't calculate that position yet, resulting in incorrect results. The process worked with the p.wait(), because like you said, process 1 completed, so the table was correct when process 2 began. – Rich Feb 22 '12 at 00:20