0

I'm analyzing a large graph. So, I divide the graph into chunks and hopefully with multi-core CPU it would be faster. However, my model is a randomized model so there's a chance that the results of each run won't be the same. I'm testing the idea and I get the same result all the time so I'm wondering if my code is correct.

Here's my code

from multiprocessing import Process, Queue

# split a list into evenly sized chunks

def chunks(l, n):
    return [l[i:i+n] for i in range(0, len(l), n)]

def multiprocessing_icm(queue, nodes):
    queue.put(independent_cascade_igraph(twitter_igraph, nodes, steps=1))

def dispatch_jobs(data, job_number):
    total = len(data)
    chunk_size = total / job_number
    slice = chunks(data, chunk_size)
    jobs = []
    processes = []
    queue = Queue()
    for i, s in enumerate(slice):
        j = Process(target=multiprocessing_icm, args=(queue, s))
        jobs.append(j)
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()

    return queue

dispatch_jobs(['121817564', '121817564'], 2)

if you're wondering what independent_cascade_igraph is. Here's the code

def independent_cascade_igraph(G, seeds, steps=0):
    # init activation probabilities
    for e in G.es():
        if 'act_prob' not in e.attributes():
            e['act_prob'] = 0.1
        elif e['act_prob'] > 1:
            raise Exception("edge activation probability:", e['act_prob'], "cannot be larger than 1")

    # perform diffusion
    A = copy.deepcopy(seeds)  # prevent side effect
    if steps <= 0:
        # perform diffusion until no more nodes can be activated
        return _diffuse_all(G, A)
    # perform diffusion for at most "steps" rounds
    return _diffuse_k_rounds(G, A, steps)

def _diffuse_all(G, A):
    tried_edges = set()
    layer_i_nodes = [ ]
    layer_i_nodes.append([i for i in A])  # prevent side effect
    while True:
        len_old = len(A)
        (A, activated_nodes_of_this_round, cur_tried_edges) = _diffuse_one_round(G, A, tried_edges)
        layer_i_nodes.append(activated_nodes_of_this_round)
        tried_edges = tried_edges.union(cur_tried_edges)
        if len(A) == len_old:
            break
    return layer_i_nodes

def _diffuse_k_rounds(G, A, steps):
    tried_edges = set()
    layer_i_nodes = [ ]
    layer_i_nodes.append([i for i in A])
    while steps > 0 and len(A) < G.vcount():
        len_old = len(A)
        (A, activated_nodes_of_this_round, cur_tried_edges) = _diffuse_one_round(G, A, tried_edges)
        layer_i_nodes.append(activated_nodes_of_this_round)
        tried_edges = tried_edges.union(cur_tried_edges)
        if len(A) == len_old:
            break
        steps -= 1
    return layer_i_nodes

def _diffuse_one_round(G, A, tried_edges):
    activated_nodes_of_this_round = set()
    cur_tried_edges = set()
    for s in A:
        for nb in G.successors(s):
            if nb in A or (s, nb) in tried_edges or (s, nb) in cur_tried_edges:
                continue
            if _prop_success(G, s, nb):
                activated_nodes_of_this_round.add(nb)
            cur_tried_edges.add((s, nb))
    activated_nodes_of_this_round = list(activated_nodes_of_this_round)
    A.extend(activated_nodes_of_this_round)
    return A, activated_nodes_of_this_round, cur_tried_edges

def _prop_success(G, src, dest):
    '''
    act_prob = 0.1
    for e in G.es():
        if (src, dest) == e.tuple:
            act_prob = e['act_prob']
            break
    '''
    return random.random() <= 0.1

Here's the result of multiprocessing

[['121817564'], [1538, 1539, 4, 517, 1547, 528, 2066, 1623, 1540, 538, 1199, 31, 1056, 1058, 547, 1061, 1116, 1067, 1069, 563, 1077, 1591, 1972, 1595, 1597, 1598, 1088, 1090, 1608, 1656, 1098, 1463, 1105, 1619, 1622, 1111, 601, 1627, 604, 1629, 606, 95, 612, 101, 1980, 618, 1652, 1897, 1144, 639, 640, 641, 647, 650, 1815, 1677, 143, 1170, 1731, 660, 1173, 1690, 1692, 1562, 1563, 1189, 1702, 687, 689, 1203, 1205, 1719, 703, 1219, 1229, 1744, 376, 1746, 211, 1748, 213, 1238, 218, 221, 735, 227, 1764, 741, 230, 1769, 1258, 1780, 1269, 1783, 761, 763, 1788, 1789, 1287, 769, 258, 1286, 263, 264, 780, 1298, 1299, 1812, 473, 1822, 1828, 806, 811, 1324, 814, 304, 478, 310, 826, 1858, 1349, 326, 327, 1352, 329, 1358, 336, 852, 341, 854, 1879, 1679, 868, 2022, 1385, 1902, 1904, 881, 1907, 1398, 1911, 888, 1940, 1402, 1941, 1920, 1830, 387, 1942, 905, 1931, 1411, 399, 1426, 915, 916, 917, 406, 407, 1433, 1947, 1441, 419, 1445, 1804, 428, 1454, 1455, 948, 1973, 951, 1466, 443, 1468, 1471, 1474, 1988, 966, 1479, 1487, 976, 467, 1870, 2007, 985, 1498, 990, 1504, 1124, 485, 486, 489, 492, 2029, 2033, 1524, 1534, 2038, 1018, 1535, 510, 1125]]
[['121817564'], [1538, 1539, 4, 517, 1547, 528, 2066, 1623, 1540, 538, 1199, 31, 1056, 1058, 547, 1061, 1116, 1067, 1069, 563, 1077, 1591, 1972, 1595, 1597, 1598, 1088, 1090, 1608, 1656, 1098, 1463, 1105, 1619, 1622, 1111, 601, 1627, 604, 1629, 606, 95, 612, 101, 1980, 618, 1652, 1897, 1144, 639, 640, 641, 647, 650, 1815, 1677, 143, 1170, 1731, 660, 1173, 1690, 1692, 1562, 1563, 1189, 1702, 687, 689, 1203, 1205, 1719, 703, 1219, 1229, 1744, 376, 1746, 211, 1748, 213, 1238, 218, 221, 735, 227, 1764, 741, 230, 1769, 1258, 1780, 1269, 1783, 761, 763, 1788, 1789, 1287, 769, 258, 1286, 263, 264, 780, 1298, 1299, 1812, 473, 1822, 1828, 806, 811, 1324, 814, 304, 478, 310, 826, 1858, 1349, 326, 327, 1352, 329, 1358, 336, 852, 341, 854, 1879, 1679, 868, 2022, 1385, 1902, 1904, 881, 1907, 1398, 1911, 888, 1940, 1402, 1941, 1920, 1830, 387, 1942, 905, 1931, 1411, 399, 1426, 915, 916, 917, 406, 407, 1433, 1947, 1441, 419, 1445, 1804, 428, 1454, 1455, 948, 1973, 951, 1466, 443, 1468, 1471, 1474, 1988, 966, 1479, 1487, 976, 467, 1870, 2007, 985, 1498, 990, 1504, 1124, 485, 486, 489, 492, 2029, 2033, 1524, 1534, 2038, 1018, 1535, 510, 1125]]

But here's the example if I run indepedent_cascade_igraph twice

independent_cascade_igraph(twitter_igraph, ['121817564'], steps=1)
[['121817564'],
 [514,
  1773,
  1540,
  1878,
  2057,
  1035,
  1550,
  2064,
  1042,
  533,
  1558,
  1048,
  1054,
  544,
  545,
  1061,
  1067,
  1885,
  1072,
  350,
  1592,
  1460,...

independent_cascade_igraph(twitter_igraph, ['121817564'], steps=1)
[['121817564'],
 [1027,
  2055,
  8,
  1452,
  1546,
  1038,
  532,
  1045,
  542,
  546,
  1059,
  549,
  1575,
  1576,
  2030,
  1067,
  1068,
  1071,
  564,
  573,
  575,
  1462,
  584,
  1293,
  1105,
  595,
  599,
  1722,
  1633,
  1634,
  614,
  1128,
  1131,
  1286,
  621,
  1647,
  1648,
  627,
  636,
  1662,
  1664,
  1665,
  130,
  1671,
  1677,
  656,
  1169,
  148,
  1686,
  1690,
  667,
  1186,
  163,
  1700,
  1191,
  1705,
  1711,...

So, what I'm hoping to get out of this is if I have a list of 500 ids, I would like the first CPU to calculate the first 250 and the second CPU to calculate the last 250 and then merge the result. I'm not sure if I understand multiprocessing correctly.

toy
  • 11,711
  • 24
  • 93
  • 176

2 Answers2

2

As mentioned e.g. in this SO answer, in *nix child processes inherit the state of the RNG. Call random.seed() in every child process to initialize it yourself to a per-process seed, or randomly.

Community
  • 1
  • 1
fjarri
  • 9,546
  • 39
  • 49
0

Haven't read your program in detail but my general feeling is that you probably have a random number generator seed problem. If you run twice the program on the same CPU the random number generator's state will be different the second time you run it. But if you run it on 2 different CPUs, maybe your generators are initialized with the same default seed, thus giving the same results.

Julien
  • 13,986
  • 5
  • 29
  • 53