6

I have implemented a SharedList in Python (version 3.7) with the help of Manager, Lock of multiprocessing. I have used it as a shared object to among the process created using multiprocessing Process function call. Shared List is used to storing the values/objects generated by each process Sharing it.

Implementation of SharedList with Manager and Lock of multiprocessing of Python

class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.results = self.manager.list([])
        self.lock = Lock()
        self.limit = limit

    def append(self, new_value):
        with self.lock:
            if len(self.results) == self.limit:
                return False
            self.results.append(new_value)
            return True

    def list(self):
        with self.lock:
            return list(self.results).copy()

Usage of created SharedList to store the values by multiple processes created using multiprocessing

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

Implementation of child_function

while True:
  result = func()
  if not (results.append(result)):
     break

The implementation works of some scenarios, but hangup, when I have increased the limit. I have used the processor count less than the number of CPU and do the same experiment still hang up in the same position.

Are there any better approaches to the above problem, I have looked into different approaches such as using Queue, but that doesn't work as expected, get hang up?

Added the Previous Implementation using Queue

Implementation using Queue

results_out = []
manager = multiprocessing.Manager()
results = manager.Queue()
tasks = manager.Queue()
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
    new_process = multiprocessing.Process(target=child_function,
                                            args=(tasks, results)
    processes.append(new_process)
    new_process.start()

sleep(5)
for i in range(limit):
    tasks.put(0)
sleep(1)

for i in range(num_processes):
    tasks.put(-1)

num_finished_processes = 0
while True:
    new_result = results.get()
    if new_result == -1:
        num_finished_processes += 1
        if num_finished_processes == num_processes:
            break
    else:
        results_out.append(new_result)

for process in processes:
    process.join()

for process in processes:
    process.close()

In child_function

while True:
    task_val = tasks.get()
    if task_val < 0:
        results.put(-1)
        break
    else:
        result = func()
        results.put(result)

Updated

I had gone through the following references, before posting this question, but I am unable to get the desired output. I agree, this code led to a deadlock state, but I am unable to find an implementation without deadlock using multiprocessing in python

References

  1. Multiprocessing of shared list

  2. https://pymotw.com/2/multiprocessing/basics.html

  3. Shared variable in python's multiprocessing

  4. https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing

  5. https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba

  6. http://kmdouglass.github.io/posts/learning-pythons-multiprocessing-module/

  7. python multiprocessing/threading cleanup

Based on the suggestion, I was able to modify SharedList using Queue

class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.tasks = self.manager.Queue()
        self.results = self.manager.Queue()
        self.limit = limit
        self.no_of_process = min(process_count, limit)

    def setup(self):
        sleep(1)
        for i in range(self.limit):
            self.tasks.put(0)
        sleep(1)
        for i in range(self.no_of_process):
            self.tasks.put(-1)

    def append(self, new_value):
        task_val = self.tasks.get()
        if task_val < 0:
            self.results.put(-1)
            return False
        else:
            self.results.put(new_value)
            return True

    def list(self):
        results_out = []
        num_finished_processes = 0
        while True:
            new_result = self.results.get()
            if new_result == -1:
                num_finished_processes += 1
                if num_finished_processes == self.no_of_process:
                    break
            else:
                results_out.append(new_result)
        return results_out

This implementation works fine, with following implementation change

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

results.setup()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

Implementation of child_function

while True:
  result = func()
  if not (results.append(result)):
     break

But Still, again this ended up in deadlock, where it hangup, after some iterations

Amutheezan
  • 345
  • 1
  • 8
  • 24
  • 1
    You say you tried using `multiprocessing.Queue` but are getting "hang up". Perhaps you should share the code with the problem and ask about that, since that is exactly the data structure that's intended for what you're trying to do and it hanging is probably the result of some mistake in your code. – Grismar Nov 19 '19 at 06:43
  • 1
    Your code with the queue does not deadlock for me. Can you explain where it deadlocks? And for `SharedList` I guess the interesting thing is what you do in `child_function`. Can you show this code as well? Compared to the queue implementation, it seems you now only have a single list? – Daniel Junglas Nov 19 '19 at 07:47
  • I cannot reproduce a dead lock here and I don't see an obvious problem in your code. I suggest you use a debugger to figure out where exactly the processes are hanging in case a deadlock occurs. Maybe it also helps to print from `child_function` what the different processes are doing. Or maybe the problem is in function `func()` (which you did not show)? – Daniel Junglas Nov 20 '19 at 12:44
  • this code block is actually a rough description of my implementation. I tried printing different process and what they are actually doing, so I observed after running several iterations. (Means running the multiprocessing by specifying different values of limit say 500, 1000. limit means the size of the list when it get filled). It leaves a particular one process running after some time and it hangup after a few iterations. I have also used some dummy function to isolate the error that can be caused by ```func()``` – Amutheezan Nov 20 '19 at 16:21

2 Answers2

1

I found the below article based on Ray, which sounds interesting and easy to implement parallel computing, effectively as well as time-efficient

https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8

Amutheezan
  • 345
  • 1
  • 8
  • 24
0

Based on the suggestion, I was able to modify SharedList using Queue

class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.tasks = self.manager.Queue()
        self.results = self.manager.Queue()
        self.limit = limit
        self.no_of_process = min(process_count, limit)

    def setup(self):
        sleep(1)
        for i in range(self.limit):
            self.tasks.put(0)
        sleep(1)
        for i in range(self.no_of_process):
            self.tasks.put(-1)

    def append(self, new_value):
        task_val = self.tasks.get()
        if task_val < 0:
            self.results.put(-1)
            return False
        else:
            self.results.put(new_value)
            return True

    def list(self):
        results_out = []
        num_finished_processes = 0
        while True:
            new_result = self.results.get()
            if new_result == -1:
                num_finished_processes += 1
                if num_finished_processes == self.no_of_process:
                    break
            else:
                results_out.append(new_result)
        return results_out

This implementation works fine, with following implementation change

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

results.setup()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

Implementation of child_function

while True:
  result = func()
  if not (results.append(result)):
     break
Amutheezan
  • 345
  • 1
  • 8
  • 24