0

I have a multiprocessor program. Each process takes a number from data, and then inserts it into__queue_out.

But there is a problem: When the last process starts, an endless cycle begins and all the processes just die

import time
import threading
import random
from queue import Queue, PriorityQueue
from multiprocessing import Pool, Process


data = range(1, 1001)

start = time.time()
end_date = time.time() - start




class Worker(Process):
    counter = -1

    def __init__(self, queue_in, queue_out):
        super(Worker, self).__init__()
        self._daemon = Process().daemon
        # self.setDaemon(True)

        self.__queue_in = queue_in
        self.__queue_out = queue_out

    def run(self):
        while True:
            job = self.__queue_in.get()

            Worker.counter += 1
            num = Worker.counter

            print('Take: ', self.name, job)

            print('Complete: ', self.name, job)

            self.__queue_out.put((num, job))
            self.__queue_in.task_done()


queue = Queue()
res = PriorityQueue()


for i in data:
    queue.put(i)


for i in range(1):
    w = Worker(queue, res)
    w.start()

queue.join()

out = []
while not res.empty():
    out.append(res.get()[1])


print(out)
print(end_date)
lakvak
  • 27
  • 7
  • 2
    Are you familiar with the Global Interpreter Lock (GIL) in python? https://realpython.com/python-gil/#the-impact-on-multi-threaded-python-programs – mrzo Apr 16 '20 at 06:36
  • @mrzo Yes, I read about the GIL. I think that you can bypass it and speed up the program using multiprocessing, but I don’t know how to redo my code correctly. – lakvak Apr 16 '20 at 06:49
  • Because of the GIL, adding more threads does nothing but increase the overhead of doing the processing because threads do not really run concurrently in Python for the most part. The exceptions are when they do I/O or call external modules written in some other language. – martineau Apr 16 '20 at 07:17
  • @martineau OK, I get that. How can I redo the code so that instead of threads it starts processes? – lakvak Apr 16 '20 at 07:19
  • I recommend [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor). You can also use the [`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing) module directly. – martineau Apr 16 '20 at 07:22
  • @martineau I managed to start several processes, but there was another problem associated with them. I updated the question – lakvak Apr 16 '20 at 07:30
  • There are numerous changes that need to be made to your code. Each process runs in its own memory-space, so global variables can't be shares between different processes. You also need to use a `multiprocessing.Queue`, not a `queue.Queue`. The later don't have a `join()` method. Depending on what OS you're using you may need to add an `if __name__ == '__main__':` "guard" around code that you only want to run in the main process, otherwise it will get execute everytime a subprocess is started. In conclusion I think you need to have a better understanding of how multiprocessing in general works. – martineau Apr 16 '20 at 08:19

1 Answers1

1

Here's how to do it with the multiprocessing module. Note I had to change the use of queues to multiprocessing.Queue and multiprocessing.JoinableQueue. Also note that there is no multiprocessing.PriorityQueue, so I changed it to be a regular one — however it looks like there may be a relatively easy workaround — see Strange Queue.PriorityQueue behaviour with multiprocessing in Python 2.7.6 (even though it's an old question).

#from queue import PriorityQueue
from multiprocessing import JoinableQueue, Pool, Process, Queue
import time


class Worker(Process):
    counter = -1

    def __init__(self, queue_in, queue_out):
        super().__init__(daemon=True)
        self._queue_in = queue_in
        self._queue_out = queue_out

    def run(self):
        while True:
            job = self._queue_in.get()

            Worker.counter += 1
            num = Worker.counter

#            print('Take:: ', self.name, job)
#            print('Complete: ', self.name, job)

            self._queue_out.put((num, job))
            self._queue_in.task_done()


if __name__ == '__main__':

    start = time.time()
    data = range(1, 1001)
    queue = JoinableQueue()
#    res = PriorityQueue()  # No multiprocessing.PriorityQueue.
    res = Queue()

    for i in data:
        queue.put(i)

    for i in range(1):
        w = Worker(queue, res)
        w.start()

    queue.join()

    out = []
    while not res.empty():
        out.append(res.get()[1])

    print(out)
    print('elapsed time:', time.time() - start)
martineau
  • 119,623
  • 25
  • 170
  • 301