1

Assuming that there is a dummy.txt file that holds the following information:

a_to_b_from_c 20
a_to_b_from_w 30
a_to_b_from_k 20
a_to_b_from_l 10
c_to_f_from_e 30
c_to_f_from_k 20
c_to_f_from_l 10

(Numerical values are only 10,20, and 30) and the following code:

import multiprocessing 

global conns 

class Line():
    def __init__(self, text, group) -> None:
        self.text = text 
        self.group = int(group)

    def get_link(self):
        return self.text.split('_from')[0]

    def __repr__(self):
        return f"<{self.group},{self.text}>"

class Groups():

    def __init__(self, name ) -> None: 
        self.name = name 
        self.groups = {k:set() for k in [10,20,30]}

    def add_to_dict(self,line : Line):
        
        connection = line.get_link()
        if connection not in self.groups.keys():
            self.groups[connection] = set()
        
        self.groups[connection].add(line.text)

def thread_f(item : Line):

    # Update the dictionary of every Group object accordingly
    global conns
 
    key = item.get_link()
     
    conns[key].add_to_dict(item)
    
def main():

    global conns 

    # Parse the file and store the information in an iterable
    with open('dummy.txt') as f:

        info = [ Line(*line.strip().split()) for line in f]

    # Update the global (shared) object and initialize a dictionary 
    # this has the following initialization: 
    # { a_to_b : set(), c_to_f : set() }
    conns = { k : Groups(k) for k in {x.get_link() for x in info} }

    # Update the shared object according to the iterable information
    with multiprocessing.Pool(5) as pool:

        res = pool.map(thread_f,     # add to the appropriate key the items 
                        info,        # the lines 
                        chunksize=1) # respect the order

    # Display the Results        
    for group_name, group_obj in conns.items():

        print(f"Grp Name {group_name} has the following:")

        for cost, connections in group_obj.groups.items():

            print(cost,connections)
        

if __name__ == "__main__":
    main()

What I am trying to do is to first parse the file and for every line of the file to generate a Line object. After the parsing is done I update the global variable conns that I intend to use as a shared variable for all the workers of the pool. Then in the thread_f function I am updating the global variable (dictionary) by adding to the appropriate Group object's dictionary field the respective Line.

The problem is that when I try to display the information, nothing gets displayed. Instead, I get a collection of empty sets:

Grp Name a_to_b has the following:
10 set()
20 set()
30 set()
Grp Name c_to_f has the following:
10 set()
20 set()
30 set()

Instead, I was expecting the following:

Grp Name a_to_b has the following
10 set(a_to_b_from_l)
20 set(a_to_b_from_c,a_to_b_from_k)
30 set(a_to_b_from_w)
Grp Name c_to_f has the following:
10 set(c_to_f_from_l)
20 set(c_to_f_from_k)
30 set(c_to_f_from_e)

Since, the python multiprocessing is practically a fork approach I do understand that the child processes do have access to the parental already initialized information but their changes have no effect back to the parent. After reading the docs and searching in S.O. I found about the Manager objects of the multiprocessing package. The thing is, that I am unable to generate a Manager.dict() that is already initialized (like I have done in my case with the conns comprehension).

How can I achive the aforementioned desired behavior ?

Yes but why Multiprocessing?

Well, this example is a mere MWE that I created to mimic what my actual code does. As a matter of fact, I am trying to speedup code that does not scale well for really large files of input.

Regarding the Manager

Driven by a slightly similar question here [1], I did not manage to find a way of initializing or a Manager.dict() from a pre-existing i.e., already initialized dictionary to pass into the spawned processes. Thus, I used sets which guarantee that there will be no duplicate entries and a global already initialized variable to be continuously updated by the processes.

A work-around for the proposed Manager approach ?

Well, since the computational effort increases with the usage of a shared resource which is the Manager.dict object, could a potential work-around be the following?

The idea is to:

  • Avoid the usage of shared resources amongst processes
  • Thus, avoid race conditions

So, assuming that we have our working pool of processes that consists of X processes in total. The task of each worker is to categorize each Line to the appropriate Group. The Lines are given to the workers as a list[Line] object. Thus, what if we use a somewhat divide-and-conquer approach like this:

+--------------------------------------------------------------+
|                        list[Line]                            |
+--------------------------------------------------------------+
|        |        |        |        |        |        |        |
|        |        |        |        |        |        |        |
  <-d->    <-d->    <-d->     ...                       <-d->

The list is divided into X independent chunks/slices/sublists of size len(list)/X. Then, each of these iterables is given to the workers for processing. But now the thread_f function has to be modified accordingly.

It shall:

  • Generate a dictionary of Groups of where key = line.group and value = Group object
  • Fill this dictionary according to the Line objects of his given chunk/slice/sublist.
  • Return the dictionary

After the pool of procs has finished, the results i.e., the dictionaries have to be merged into one that will have the final solution.

ex1led
  • 427
  • 5
  • 21
  • Can you explain why you want to use multiprocessing here? Your code is pretty simple and I would expect it to execute in about a millisecond or less in a single process. By using multiprocessing you have made it very complicated. Also, can you explain why you can't use Manager.dict() to share a dictionary across multiple processes? Manager.dict() can take a mapping object, so the dictionary you already created can be converted to shared dict with one line of code. A sharable dictionary would go a long way toward meeting your requirement. – Paul Cornelius Feb 15 '22 at 02:52
  • @PaulCornelius it’s because the file is millions of lines long. I have created this as a small example. I will update my question with the answer to the manager approach. – ex1led Feb 15 '22 at 09:53

1 Answers1

1

First, I believe you have an error in method Groups.add_to_dict. I have commented out the erroneously statement and added the correct statement after it:

import multiprocessing

def init_processes(d, the_lock):
    global conns, lock
    conns, lock = d, the_lock

class Line():
    def __init__(self, text, group) -> None:
        self.text = text
        self.group = int(group)

    def get_link(self):
        return self.text.split('_from')[0]

    def __repr__(self):
        return f"<{self.group},{self.text}>"

class Groups():

    def __init__(self, name ) -> None:
        self.name = name
        self.groups = {k:set() for k in [10,20,30]}

    def add_to_dict(self,line : Line):

        #connection = line.get_link()
        connection = line.group
        if connection not in self.groups.keys():
            self.groups[connection] = set()

        self.groups[connection].add(line.text)


def thread_f(item : Line):

    # Update the dictionary of every Group object accordingly
    global conns # Not strictly necessary

    key = item.get_link()

    # We need to let the managed dict know there is an updated value for the key:
    """
    conns[key].add_to_dict(item)
    """
    with lock:
        the_set = conns[key]
        the_set.add_to_dict(item)
        conns[key] = the_set # reset the reference

def main():

    # Parse the file and store the information in an iterable
    with open('dummy.txt') as f:

        info = [ Line(*line.strip().split()) for line in f]

    # Update the global (shared) object and initialize a dictionary
    # this has the following initialization:
    # { a_to_b : set(), c_to_f : set() }
    conns = multiprocessing.Manager().dict(
        { k : Groups(k) for k in {x.get_link() for x in info} }
    )

    # Update the shared object according to the iterable information
    lock = multiprocessing.Lock()
    with multiprocessing.Pool(5, initializer=init_processes, initargs=(conns, lock)) as pool:

        res = pool.map(thread_f,     # add to the appropriate key the items
                        info,        # the lines
                        chunksize=1) # respect the order

    # Display the Results
    for group_name, group_obj in conns.items():

        print(f"Grp Name {group_name} has the following:")

        for cost, connections in group_obj.groups.items():

            print(cost,connections)


if __name__ == "__main__":
    main()

Prints:

Grp Name c_to_f has the following:
10 {'c_to_f_from_l'}
20 {'c_to_f_from_k'}
30 {'c_to_f_from_e'}
Grp Name a_to_b has the following:
10 {'a_to_b_from_l'}
20 {'a_to_b_from_k', 'a_to_b_from_c'}
30 {'a_to_b_from_w'}

Update

I may be off base here, but it seems that most of the work I see is in the parsing of the input line. In your real case, whatever that might be, it might represent a negligible portion of your total processing (and if doesn't, then, as I have previously mentioned in a comment, multiprocessing is not appropriate for this problem), but I see no reason not to move that processing to the multiprocessing pool itself.

I have greatly refactored the code, moving the line parsing to the the Line class and no longer seeing the need for the Groups class since the merging of dictionaries and sets is being done by the main process.

I am using method imap_unordered rather than imap since it is generally slightly more efficient and your previous coding, which did not use the return values from the map method, did not depend on the order in which results were generated. And so keys could be added to the dictionary in arbitrary order. And why should order even matter in a dictionary to begin with?

You should be aware that if your input file has very many lines and your worker function requires non-trivial processing, you can fill the multiprocessing task queue much faster than the processes can empty it and you will potentially exhaust memory. I do have a solution to that, but that is another story.

import multiprocessing


class ProcessedLine():
    def __init__(self, text : str) -> None:
        self.text, group = text.strip().split()
        self.group = int(group)
        self.link = text.split('_from')[0]
        self.dict = {self.link: {self.group: set([self.text])}}

def process_line(text : str):
    processed_line = ProcessedLine(text)
    return processed_line

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize += 1
    return chunksize

def main():

    def generate_lines():
        with open('dummy.txt') as f:
            for line in f:
                yield line

    ESTIMATED_NUMBER_OF_LINES_IN_FILE = 7
    POOL_SIZE = min(ESTIMATED_NUMBER_OF_LINES_IN_FILE, multiprocessing.cpu_count())
    # chunksize to be used with imap_unordered:
    chunksize = compute_chunksize(ESTIMATED_NUMBER_OF_LINES_IN_FILE, POOL_SIZE)
    pool = multiprocessing.Pool(POOL_SIZE)
    # Specify a chunksize value if the size of the iterable is large
    results = {}
    for processed_line in pool.imap_unordered(process_line, generate_lines(), chunksize=chunksize):
        link = processed_line.link
        if link not in results:
            # Just update with the entire dictionary
            results.update(processed_line.dict)
        else:
            # Update the set dictionary:
            set_dict = results[link]
            set_key = processed_line.group
            if set_key in set_dict:
                set_dict[set_key].add(processed_line.text)
            else:
                #set_dict[set_key] = set(processed_line.text)
                set_dict[set_key] = processed_line.dict[link][set_key]
    pool.close()
    pool.join()

    for group_name, groups in results.items():
        print(f'Group Name {group_name} has the following:')
        for k, v in groups.items():
            print('   ', k, '->', v)
        print()

if __name__ == "__main__":
    main()

Prints:

Group Name a_to_b has the following:
    20 -> {'a_to_b_from_c', 'a_to_b_from_k'}
    30 -> {'a_to_b_from_w'}
    10 -> {'a_to_b_from_l'}

Group Name c_to_f has the following:
    30 -> {'c_to_f_from_e'}
    20 -> {'c_to_f_from_k'}
    10 -> {'c_to_f_from_l'}
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • The changes are indeed taking effect!! But the thing is, that this is not consistent. After re-running it it yields different results. Namelly, some of the sets are not correctly updated. – ex1led Feb 15 '22 at 20:30
  • I think the problem may be due to the change necessitated in function `thread_f` to ensure the managed dictionary knows that one of its set values has been updated. If two processes now are updating in parallel the same key, they could be overlaying the same set value. I have added a lock to enforce serialization of this update. Let me know if everything now is always consistent. Of course, the locking limits parallelism. – Booboo Feb 15 '22 at 22:26
  • Surely lock solves it. But, the thing is, that I employed the `set` just to avoid the scenario where two threads are adding the same value to the same key. The set should prohibit this by default since no duplicates are allowed. But, in the `lock`-free case, values were actually missing from the results. That's what I am not getting. – ex1led Feb 15 '22 at 22:32
  • I should have said that in `thread_f` the values of the dictionary are `Groups` instances not `set` instances. The `Groups` instances have sets and, in fact, there is also a race condition in method `add_to_dict` with `if connection not in self.groups.keys(): self.groups[connection] = set()` and the following statement `self.groups[connection].add(line.text)` where two processes can clobber one another without the locking. – Booboo Feb 15 '22 at 22:53
  • Okay, here's another idea then. Assuming that i have `X` processes in my pool. Then, can I split the workload i.e., slice the connection list into `len(list)/X` segments and assign each segment for processing to the `X` processes. In this way, the `thread_f` will also return a dictionary and we should lastly aggregate i.e., merge them into one. In this way we can avoid the `lock` as well I do believe. Right? – ex1led Feb 15 '22 at 23:39
  • Before we spend more time on this you should be aware that if this is really representative of what you are trying to accomplish for real that multiprocessing being used where your worker function, `thread_f` is doing such minimal processing will be *less* performant than serial processing and (2) if what you are proposing, which honestly I am not fully following, doesn't use managed dictionaries, that could be an improvement because there is a lot of overhead involved with managed objects. But that means that `thread_f` would now be doing even less work making multiprocessing a worse fit. – Booboo Feb 16 '22 at 00:01
  • Frankly speaking...the equivalent code with actual multithreading in `c++` via `tbb` finishes in mere seconds...But I would really like to find a work-around for that with multiprocessing and I think that by removing the memory managers and by simply doing a divide and conquer approach such a thing would work. If you would like and were you available we could have a chat about it. – ex1led Feb 16 '22 at 00:17
  • 1
    I don’t know where you are (what time it is for you), but it’s evening and family time now for me. I will look at this again in the morning and perhaps have some ideas for you. Meanwhile, you could, if you are inclined, update your question explaining your new idea a bit more. – Booboo Feb 16 '22 at 00:29
  • Sure thing. My post has been updated with the idea / workaround that I have. Much obliged! – ex1led Feb 16 '22 at 00:59
  • 1
    See the updated answer. – Booboo Feb 16 '22 at 14:10
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/242073/discussion-between-ex1led-and-booboo). – ex1led Feb 16 '22 at 15:00