6

I tried following this solution as well as this solution but thus far have been unsuccessful:

When I run the following block of code:

global manager
global lock
manager = Manager()
lock = manager.Lock()

class MyClass(object):

    def get_next_chunk(self, numberlist, chunks):
        for i in range(0, len(numberlist), chunks):
            yield numberlist[i:i + chunks]

    def multi_process(self, numberlist):
        procs = 5
        chunksize = 100
        with Pool(procs) as pool:
            pool.map(self.process_numberlist,
                  self.get_next_chunk(numberlist, chunksize))
        return self.running_total_list

    def process_numberlist(self, numberlist):
        temp_num_list = []
        temp_num_list = self.getnewNumbers()
        logger.debug("temp_num_list length: " + str(len(temp_num_list)))
        try:
            lock.acquire()
        except Exception as e:
            logger.error("Couldn't acquire lock")
            logger.error(e)
            traceback.format_exc()
            logger.error(sys.exc_info()[0])
        self.running_total_list = self.running_total_list + temp
        logger.debug("New running_total_list length: "
                    + str(len(self.running_total_list)))
        lock.release()
        break

The output in my logs look like:

[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 6
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 9

When my expected output I believe should look like:

[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 11
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 15

Edit - Attempt 2

See update based on Aaron's suggestion. Now receiving a 'can only join an iterable' error

global manager
global lock

class MyClass(object):

    def get_next_chunk(self, numberlist, chunks):
        for i in range(0, len(numberlist), chunks):
            yield numberlist[i:i + chunks]

    def multi_process(self, numberlist):
        procs = 5
        chunksize = 100
        manager = Manager()
        lock = manager.Lock()
        with Pool(procs) as pool:
            func = partial(self.process_numberlist, lock)
            pool.map(function,
              self.get_next_chunk(numberlist, chunksize))
        return self.running_total_list

    def process_numberlist(self, numberlist, lock):
        temp_num_list = []
        temp_num_list = self.getnewNumbers()
        logger.debug("temp_num_list length: " + str(len(temp_num_list)))
        try:
             lock.acquire()
             self.running_total_list = self.running_total_list + temp_num_list
             logger.debug("New running_total_list length: "
                + str(len(self.running_total_list)))
             lock.release()
        except Exception as e:
            logger.error("Couldn't acquire lock")
            logger.error(e)
            traceback.format_exc()
            logger.error(sys.exc_info()[0])
        break

EDIT # 3 - getNewNumbers() which is not included in this toy example, simply returns an array of integers. Hope that helps

Bryce
  • 383
  • 4
  • 19
  • 1
    Your `global lock` and `global manager` is only global within the context of a process. They need to be passed to the new process as an argument at the time of creation. They are currently being re-created (so you have multiple different locks not one shared one) when the child process imports the `__main__` file. – Aaron May 09 '19 at 17:48
  • @Aaron - I tried updating to pass in an instance of the lock via a partial, but now getting an error. I updated my original post. Any suggestions? – Bryce May 09 '19 at 19:32
  • If you "have" to use a file, then disregard the following question. Have you tried using an ACID compliant database (something like sqlite since this seems to be pretty small) for this? It appears like you're trying to implement atomic file operations across processes, in Python, which sounds like a headache IMO. – Skam May 09 '19 at 19:41
  • @Skam - Sorry for the misleading debug / log output. I am not 'locking' a file or accessing a file at all. I am simply trying to concatenate quickly via multiprocessing. I'll update the original post – Bryce May 09 '19 at 19:44
  • In your original code, you try to update `running_total_list` if you *don't* acquire the lock, since it is in the `except` clause. In the update, you try to update the value whether or not you acquire the lock. – chepner May 09 '19 at 19:58
  • That was a mistake by my part due to the copy paste from my text editor. This has been updated @chepner – Bryce May 09 '19 at 20:02
  • Ok, now you're *always* updating the value whether or not you acquire the lock. The only thing your `try` statement does is acquire the lock, or log a failure to do so. Then you update the value and (try to) release the lock. Both of those actions should be in the `try` block as well, since you shouldn't do either if you fail to acquire the lock in the first place. – chepner May 09 '19 at 20:05
  • 1
    (I'm not sure what exception you think might be raised, though. `acquire` isn't documented to raise any; it either blocks until the lock *is* acquired, or returns `False` if you request a non-blocking attempt to acquire it.) – chepner May 09 '19 at 20:08
  • This is also not a complete example; there are several undefined names (`getnewNumbers`, `vulns`, etc) that prevent someone from reproducing your errors. – chepner May 09 '19 at 20:17
  • @chepner - Fair enough. i have moved the array concatenation into the try. The try catch was originally so I could get the full output of the exception. I still have the same issue. I have also removed the vulns reference and have provided an explanation of what getNewNumbers() does in Edit #3 – Bryce May 09 '19 at 20:18
  • @Bryce I cannot see in the code which data are you protecting with the lock. Every process holds its copy of `self.running_total_list` variable, they are not shared. – Andrii Maletskyi May 13 '19 at 16:18

2 Answers2

1

You seem to confuse OOPS concept and IPC together.

See here I create an instance of class A as a in the mother process. And I call the method a.go from the same mother process. As the method a.go calls multiprocessing.Pool(2), two children processes are created. Now we have three processes. One mother and two kids.

Each has its own version of a. One mother and two kids now but three versions of instance a. I just created one instance of A as a in the mother. Who created the other two? It's the OS and Pickling in action. The kids get all the objects of its mother when it's created by the OS. If a child modifies its version of a the other versions of a are not affected.

import multiprocessing
import os

class A:
    def __init__(self):
        self.numbers = []

    def add(self, n):
        self.numbers.append(n)
        me = multiprocessing.current_process()
        print('mom: {}, my-pid: {}, data: {}'.format(os.getppid(), me.ident,
                                                     self.numbers))

    def go(self):
        with multiprocessing.Pool(2) as workers:
            workers.map(self.add, range(1, 4))


if __name__ == '__main__':
    a = A()
    a.go()
    print('pid: {}, data: {}'.format(multiprocessing.current_process().ident,
                                     a.numbers))

Output;

mom: 10029, my-pid: 10030, data: [1]
mom: 10029, my-pid: 10031, data: [2]
mom: 10029, my-pid: 10030, data: [3]
pid: 10029, data: []

There're just two kids here pid 10030 and pid 10031. They've appended 3 items to a.numbers. So one of them should have appended two items but when printed it shows only the appended item. The kid with the pid 10030 should show [1, 3]. What's going on here?
Let's initialize a.numbers list to [0] in mother and print a.numbers before appending in the kids.

import multiprocessing
import os

class A:
    def __init__(self):
        self.numbers = []

    def add(self, n):
        me = multiprocessing.current_process()
        print('mom: {}, my-pid: {}, previous-data: {}'.format(
            os.getppid(), me.ident, self.numbers))
        self.numbers.append(n)
        print('mom: {}, my-pid: {}, current-data: {}'.format(
            os.getppid(), me.ident, self.numbers))

    def go(self):
        with multiprocessing.Pool(2) as workers:
            workers.map(self.add, range(1, 4))


if __name__ == '__main__':
    a = A()
    a.numbers.append(0)
    a.go()
    print('pid: {}, data: {}'.format(multiprocessing.current_process().ident,
                                     a.numbers))

Output;

mom: 10407, my-pid: 10408, previous-data: [0]
mom: 10407, my-pid: 10408, current-data: [0, 1]
mom: 10407, my-pid: 10409, previous-data: [0]
mom: 10407, my-pid: 10409, current-data: [0, 2]
mom: 10407, my-pid: 10408, previous-data: [0]
mom: 10407, my-pid: 10408, current-data: [0, 3]
pid: 10407, data: [0]

Whatever the mother had in the a.numbers showed up in the kids. But still the child with the pid 10408 who appended 2 items didn't preserve the previously appended item.
Let's now check the instance a whose data we're mutating is the same instance or each a is different one even if the pid is same.

import multiprocessing
import os

class A:
    def __init__(self):
        self.numbers = []

    def __str__(self):
        return '<{}>'.format(', '.join(str(x) for x in self.numbers))

    def __del__(self):
        me = multiprocessing.current_process()
        print("I'm being destroyed, my pid: {}, data: {}".format(me.ident, self))

    def add(self, n):
        me = multiprocessing.current_process()
        self.numbers.append(n)
        print('mom: {}, my-pid: {}, current-data: {}'.format(
            os.getppid(), me.ident, self.numbers))

    def go(self):
        with multiprocessing.Pool(2) as workers:
            workers.map(self.add, range(1, 4))


if __name__ == '__main__':
    a = A()
    a.numbers.append(0)
    a.go()
    print('pid: {}, data: {}'.format(multiprocessing.current_process().ident,
                                     a.numbers))

Output;

mom: 11881, my-pid: 11883, current-data: [0, 2]
mom: 11881, my-pid: 11882, current-data: [0, 1]
I'm being destroyed, my pid: 11882, data: <0, 1>
I'm being destroyed, my pid: 11883, data: <0, 2>
mom: 11881, my-pid: 11883, current-data: [0, 3]
I'm being destroyed, my pid: 11883, data: <0, 3>
pid: 11881, data: [0]
I'm being destroyed, my pid: 11881, data: <0>

From the above output, it's obvious that the child process is not terminated because we could see that the pid is same but the object a is destroyed. So the process remains same but the instance a is copied from the mother.
How to share objects among processes? multiprocessing.Manager to the rescue.

import multiprocessing
import os


class A:
    def __init__(self):
        manager = multiprocessing.Manager()
        self.numbers = manager.list()

    def __str__(self):
        return '<{}>'.format(self.numbers)

    def __del__(self):
        me = multiprocessing.current_process()
        print("I'm being destroyed, my pid: {}, data: {}".format(
            me.ident, self))

    def add(self, n):
        me = multiprocessing.current_process()
        self.numbers.append(n)
        print('mom: {}, my-pid: {}, current-data: {}'.format(
            os.getppid(), me.ident, self.numbers))

    def go(self):
        with multiprocessing.Pool(2) as workers:
            workers.map(self.add, range(1, 4))


if __name__ == '__main__':
    a = A()
    a.numbers.append(0)
    a.go()
    print('pid: {}, data: {}'.format(multiprocessing.current_process().ident,
                                     a.numbers))

Output;

mom: 12296, my-pid: 12303, current-data: [0, 1]
I'm being destroyed, my pid: 12303, data: <[0, 1, 2]>
mom: 12296, my-pid: 12304, current-data: [0, 1, 2]
I'm being destroyed, my pid: 12304, data: <[0, 1, 2]>
mom: 12296, my-pid: 12303, current-data: [0, 1, 2, 3]
I'm being destroyed, my pid: 12303, data: <[0, 1, 2, 3]>
pid: 12296, data: [0, 1, 2, 3]
I'm being destroyed, my pid: 12296, data: <<ListProxy object, typeid 'list' at 0x7f69aa037048; '__str__()' failed>>

The data is now shared among processes but with some overheads.

class A:
    def __init__(self):
        print('children: {}'.format(multiprocessing.active_children()))
        manager = multiprocessing.Manager()
        print('children: {}'.format(multiprocessing.active_children()))
        self.numbers = manager.list()

if __name__ == '__main__':
    a = A()

Output;

children: []
children: [<ForkProcess(SyncManager-1, started)>]

There's an extra process to share objects.
How to solve this issue without the overhead? Make the children process and return data and do the list building in the mother.

import multiprocessing


class A:
    def __init__(self):
        self.numbers = []

    def add(self, n):
        return [n]

    def go(self):
        with multiprocessing.Pool(2) as workers:
            for lst in workers.map(self.add, range(1, 4)):
                self.numbers.extend(lst)
            print('active children: {}'.format(
                [p.ident for p in multiprocessing.active_children()]))


if __name__ == '__main__':
    a = A()
    a.numbers.append(0)
    a.go()
    print('pid: {}, data: {}'.format(multiprocessing.current_process().ident,
                                     a.numbers))

Output;

active children: [13436, 13435]
pid: 13434, data: [0, 1, 2, 3]
Nizam Mohamed
  • 8,751
  • 24
  • 32
0

It seems to me, that your main goal here is the access of a shared resource (running_total_list) which is why I am focusing on that in particular.

In your example you used Pool, whereas I used Process. You may have a look at this article on the core differences between both and decide which is more suitable for your use case.

I came up with this quick example on how to share resources between multiple processes. This should give you a good idea on how to proceed from there:

from multiprocessing import Process, Lock, Manager

def gen_numbers():
    import random
    return [i for i in range(random.randint(4,11))]

def process_numberlist(lock, shared_list, num):
    temp_num_list = gen_numbers()
    print("Proc %s: temp_num_list length: %s" %(num, len(temp_num_list)))

    try:
        lock.acquire()
        shared_list += temp_num_list
        print("Proc %s: New shared_list length: %s" %(num, len(shared_list)))
    finally:
        lock.release()

lock = Lock()
manager = Manager()
shared_list = manager.list()

proc = 5
proc_list = []

for num in range(proc):
    p = Process(target=process_numberlist, args=( lock, shared_list, num+1, ))
    p.start()

    proc_list.append( p )

for p in proc_list:
    p.join()

One important thing to notice is the definition of a shared_list here. Unlike threads, each process has their own memory space (Pool would be no exception), which is why sharing data between them didn't work. This means, that you need to implement some sort of interprocess communication (IPC) and luckily python gives you already some tools for that. One of which is multiprocessing.Manager. It exposes some data structures (like dict or list) which you may use to share between your processes.

Same goes for Lock in this case. This is important, since you do not want to access shared memory from multiple processes at the same time. This would just make your program unpredictable.

Another thing to notice is, that the execution of process_numberlist happens not necessarily in order, since every process runs independent from each other, but they all have access to the same resource.

I hope that helps!

nullchimp
  • 735
  • 4
  • 13