0

I want to build a fully connected graph parallelly in python and also get a list of edge values like :
( node1, node2 ) = edge_value
stored in a dictionary format :
{ ( node1, node2 ) : edge_value [, ... [, ... ] ] }

To do this I have first to initialize two global variables, G for graph and f_correlation for the said dictionary

import networkx as nx
from multiprocessing import Pool
G = nx.Graph()
f_correlation = {}

A function is then created to construct the graph and also stores
the ( node1, node2 ) = edge_value into the f_correlation dictionary :

def construct_graph_parallelly(pair_with_df):
    global G
    global f_correlation
    pair, df = pair_with_df
    i, j = pair
    # calculate the edge value and store it in the global variable f_correlation
    f_correlation[ (i, j) ] = calculate_value(df, i, j)    # this function calculate some value on the dataframe
    # here i, j are node in the graph 
    G.add_edge(i, j, weight = f_correlation[ (i, j) ])
    return f_correlation

Then a multiprocessing.Pool()-instance gets created and a call its .map()-method is run, to let the code execute concurrently :

def make_all_pair_with_df(node_list, df):
    all_pair_with_df = []
    for i in node_list:
        for j in node_list:
            if i != j :
                pair_with_df = (i,j),df
                all_pair_with_df.append(pair_with_df)

    return all_pair_with_df

node_list = ['a', 'b', 'c', 'd', 'e']
pool = Pool()
all_pair_with_df = make_all_pair_with_df(node_list, df) 
f_correlation = pool.map(construct_graph_parallelly, all_pair_with_df)
pool.close()
print("DONE")

But when I run the code it runs infinitely, never prints "DONE"

One of the problem may be the global-variable problem, discussed in Globals variables and Python multiprocessing

But for my work, I need to update the dictionary and the Connected Graph globally.

How can I do this or what modifications should I do to make this work?

user3666197
  • 1
  • 6
  • 50
  • 92
  • Is there any reason for not generating the graph first and updating the data second? My first guess for the infinite run is that the code keeps overwriting partially generated graph or something like that. This probably won't happen if you have the graph prepared beforehands. Also note, that creating the full graph is as easy as `complete_graph(n, create_using=None)` (courtesy to the networkx docs). Updating should be possible. – Shamis Nov 25 '20 at 11:29
  • There are always issues with multiple workers accessing the same object, that is why locks are a thing. It might be necessary to restructure your operation - for example, if the computation is lenghty and the creation of the graph itself is not such an issue, generate all the values, then create the graph. Also, since it's a complete graph, wouldn't it make more sense to use N * N array(matrix or whatever) to store the values? – Shamis Nov 25 '20 at 11:33
  • I have checked that when I call the *pool.map* function it never enters the target function *construct_graph_parallelly*. Global variable in the *construct_graph_parallelly* may be the reason. As for the early computation of the edge values I can do that. But for the complete graph creation I need to iterate over all node pairs from the node list for the edge value assignment. Does *complete_graph* or any other method take this type of arguments where I will provide list of node pairs with corresponding edge value *( node_pairs: edge_value)*,then it will give me the constructed complete graph? – Hasan Tarek Nov 28 '20 at 14:13
  • The complexity of complete graph is N*N (here, N is very large), that's why I need to parallelized this code to run it in less execution time. Your help would be very much useful to me. – Hasan Tarek Nov 28 '20 at 14:13

1 Answers1

0

Update: Let's be a bit less ambitious and use multiprocessing just to build the f_correlation dictionary.

With your current code each process has its own copy of the global variables. You should use sharable, managed types (see multiprocessing.SyncManager). For example:

from multiprocessing import Pool, Manager

# initialize this process's global variables:
def pool_initializer(the_dict):
    # initialize global variable with shared, managed dictionary
    global f_correlation
    f_correlation = the_dict

def construct_graph_parallelly(pair_with_df):
    global f_correlation
    pair, df = pair_with_df
    i, j = pair
    # calculate the edge value and store it in the global variable f_correlation
    f_correlation[(i, j)] = calculate_value(df, i, j)    # this function calculate some value on the dataframe

def main():    
    with Manager() as manager: # create SyncManager instance
        f_correlation = manager.dict() # create managed, shared dictionary
        # code to initialize G omitted
        with Pool(initializer=pool_initializer, initargs=(f_correlation,)) as pool:
            all_pair_with_df = make_all_pair_with_df(node_list, df) 
            pool.map(construct_graph_parallelly, all_pair_with_df)
            # now build graph
            G = nx.Graph()
            for k, v in f_correlation.items():
               i, j = k # unpack
               G.add_edge(i, j, weight=v)
    
if __name__ == '__main__': # required for Windows
    main()
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • In python *manager* has built-in method *dict()* to create a shared dictionary object that will be updated globally by each process, but for the graph there is no built in method for shareable graph object. I need to update the dictionary *f_correlation* as well as the single Graph *G* by each process *globally*. From your code, I may be able to update the dictionary globally but for the Graph I am not getting it. You are initializing a list/array of graphs in the *pool_initializer* function but I want only one graph *G* that will be updated (globally) by each process. How can I do it? – Hasan Tarek Nov 28 '20 at 13:37
  • You will probably have to use an `Array` of the appropriate type. You can also look at the shared `ctypes` objects that the `multiprocessing.sharedctypes` module provides. – Booboo Nov 28 '20 at 13:46
  • If I use *manager.Array(typecode, sequence)* what could be the appropriate type for graph? Also, How can I initialize it? Because if I initialize it with *manager.Array(typecode, sequence)* it will return an array. Then for edge value assigning I will not be able to use this method *G.add_edge(i, j, weight = f_correlation[ (i, j) ])* in *construct_graph_parallelly* function. As G will now be an Array where *add_edge* method is for single variable, not for the Array/list object. – Hasan Tarek Nov 28 '20 at 14:41
  • See updated answer. How much multiprocessing buys you, if anything, will depend on the complexity of function `calculate_value`, but I wouldn't think that there is much value in trying to multi-process the adding of edges to a graph. – Booboo Nov 28 '20 at 16:40
  • While running the code, it shows *calculate_value* function is undefined. If the calculation that has been done in the *calculate_value* function is transferred to *construct_graph_parallelly* function it works fine (using *multiprocess* instead of *multiprocessing* module, I am working in anaconda, jupyter notebook,python 3.6). There is another issue that why do I need to import *numpy* or other modules to work with those module in the *construct_graph_parallelly* function again? As I have already imported those modules outside the *construct_graph_parallelly* function. Is there any solution? – Hasan Tarek Nov 29 '20 at 15:43
  • `calculate_value` is undefined because in your question posting you reference it without providing the definition of that function presumably because it is defined elsewhere and you chose to omit its definition to avoid clutter. Am I supposed to guess its definition? As for your second question, I don't see enough of your code to quite follow the issue. Perhaps it's time to post a second question? – Booboo Nov 29 '20 at 17:52