Assuming that there is a dummy.txt
file that holds the following information:
a_to_b_from_c 20
a_to_b_from_w 30
a_to_b_from_k 20
a_to_b_from_l 10
c_to_f_from_e 30
c_to_f_from_k 20
c_to_f_from_l 10
(Numerical values are only 10
,20
, and 30
) and the following code:
import multiprocessing
global conns
class Line():
def __init__(self, text, group) -> None:
self.text = text
self.group = int(group)
def get_link(self):
return self.text.split('_from')[0]
def __repr__(self):
return f"<{self.group},{self.text}>"
class Groups():
def __init__(self, name ) -> None:
self.name = name
self.groups = {k:set() for k in [10,20,30]}
def add_to_dict(self,line : Line):
connection = line.get_link()
if connection not in self.groups.keys():
self.groups[connection] = set()
self.groups[connection].add(line.text)
def thread_f(item : Line):
# Update the dictionary of every Group object accordingly
global conns
key = item.get_link()
conns[key].add_to_dict(item)
def main():
global conns
# Parse the file and store the information in an iterable
with open('dummy.txt') as f:
info = [ Line(*line.strip().split()) for line in f]
# Update the global (shared) object and initialize a dictionary
# this has the following initialization:
# { a_to_b : set(), c_to_f : set() }
conns = { k : Groups(k) for k in {x.get_link() for x in info} }
# Update the shared object according to the iterable information
with multiprocessing.Pool(5) as pool:
res = pool.map(thread_f, # add to the appropriate key the items
info, # the lines
chunksize=1) # respect the order
# Display the Results
for group_name, group_obj in conns.items():
print(f"Grp Name {group_name} has the following:")
for cost, connections in group_obj.groups.items():
print(cost,connections)
if __name__ == "__main__":
main()
What I am trying to do is to first parse the file and for every line of the file to generate a Line
object. After the parsing is done I update the global variable conns
that I intend to use as a shared variable for all the workers of the pool. Then in the thread_f
function I am updating the global variable (dictionary) by adding to the appropriate Group
object's dictionary field the respective Line
.
The problem is that when I try to display the information, nothing gets displayed. Instead, I get a collection of empty sets:
Grp Name a_to_b has the following:
10 set()
20 set()
30 set()
Grp Name c_to_f has the following:
10 set()
20 set()
30 set()
Instead, I was expecting the following:
Grp Name a_to_b has the following
10 set(a_to_b_from_l)
20 set(a_to_b_from_c,a_to_b_from_k)
30 set(a_to_b_from_w)
Grp Name c_to_f has the following:
10 set(c_to_f_from_l)
20 set(c_to_f_from_k)
30 set(c_to_f_from_e)
Since, the python multiprocessing is practically a fork
approach I do understand that the child processes do have access to the parental already initialized information but their changes have no effect back to the parent. After reading the docs and searching in S.O. I found about the Manager
objects of the multiprocessing package. The thing is, that I am unable to generate a Manager.dict()
that is already initialized (like I have done in my case with the conns
comprehension).
How can I achive the aforementioned desired behavior ?
Yes but why Multiprocessing?
Well, this example is a mere MWE that I created to mimic what my actual code does. As a matter of fact, I am trying to speedup code that does not scale well for really large files of input.
Regarding the Manager
Driven by a slightly similar question here [1], I did not manage to find a way of initializing or a Manager.dict()
from a pre-existing i.e., already initialized dictionary to pass into the spawned processes. Thus, I used sets
which guarantee that there will be no duplicate entries and a global
already initialized variable to be continuously updated by the processes.
A work-around for the proposed Manager
approach ?
Well, since the computational effort increases with the usage of a shared resource which is the Manager.dict
object, could a potential work-around be the following?
The idea is to:
- Avoid the usage of shared resources amongst processes
- Thus, avoid race conditions
So, assuming that we have our working pool of processes that consists of X
processes in total. The task of each worker is to categorize each Line
to the appropriate Group
. The Lines
are given to the workers as a list[Line]
object. Thus, what if we use a somewhat divide-and-conquer approach like this:
+--------------------------------------------------------------+
| list[Line] |
+--------------------------------------------------------------+
| | | | | | | |
| | | | | | | |
<-d-> <-d-> <-d-> ... <-d->
The list is divided into X
independent chunks/slices/sublists of size len(list)/X
. Then, each of these iterables is given to the workers for processing. But now the thread_f
function has to be modified accordingly.
It shall:
- Generate a dictionary of
Groups
of wherekey = line.group
and value =Group
object - Fill this dictionary according to the
Line
objects of his given chunk/slice/sublist. - Return the dictionary
After the pool
of procs has finished, the results i.e., the dictionaries have to be merged into one that will have the final solution.