1

I want to save all the data before I terminate the process. I am using the windows based machine. It works well if don't terminate the process. I tried to use the signal library, but it works only for UNIX machines. This is because of this. So basically the problem is to intercept the signal in Windows which I don't how to make. Using library atexit also did not help. I even tried to make the method save_stuff, but it also does not help. Does anyone have the idea how to make it?

The primary task is to stop the program execution after some time and collect all the data available outside the process.

from multiprocessing import Queue, Process

class Worker:

    def __init__(self):
        self.workers = 1

    def work(self, n):
        for i in range(n):
            self.workers = i
            print(i)

    def __str__(self):
        return str(self.workers)


class MyProcess(Process):

    def __init__(self, n):
        self.worker = Worker()
        self.shared_obj = Queue()
        self.shared_obj.put(self.worker)
        self.args = n
        super().__init__()

    def run(self):

        self.worker.work(self.args)

        self.shared_obj.get(self.worker)
        self.shared_obj.put(self.worker)

    def save_stuff(self):

        self.shared_obj.get(self.worker)
        self.shared_obj.put(self.worker)
        print('collect data')


if __name__ == '__main__':
    p = MyProcess(1000000)

    p.start()
    p.join(1)

    if p.is_alive():

        p.save_stuff()

        p.terminate()
        print('killed worker')
        print('shared object ' + str(p.shared_obj.get()))

    else:
        print('he was in time this worker')
        print('shared object ' + str(p.shared_obj.get()))
Pavlo
  • 47
  • 6
  • Have you looked at [atexit](https://docs.python.org/3.2/library/atexit.html) and [sys.excepthook](https://docs.python.org/2/library/sys.html#sys.excepthook)? – Andreas Storvik Strauman Sep 07 '18 at 16:24
  • The problem is to intercept the signal when calling `process.treminate()`. `atexit` did not work and `sys.excepthook` I am not sure that's what I need. – Pavlo Sep 07 '18 at 16:50

2 Answers2

0

Calling p.save_stuff() from parent process will not work. The two processes are running in different address spaces and parent will not have updated copy of modified attributes in child process. Since you are updating the data from child and reading from parent, it would be safe to use shared memory which should be updated at every iteration of loop. The following code should work for what you are trying to achieve here.

from multiprocessing import Value, Process

class Worker:

    def __init__(self):
        self.workers = 1

    def work(self, n, shm):
        for i in range(n):
            self.workers = i
            shm.value = self.workers
            print(i)

    def __str__(self):
        return str(self.workers)


class MyProcess(Process):

    def __init__(self, n, shm):
        self.worker = Worker()
        self.args = n
        self.shm = shm
        super().__init__()


    def run(self):
        self.worker.work(self.args, self.shm)



if __name__ == '__main__':
    shm = Value("i", 0)
    p = MyProcess(1000000, shm)
    p.start()
    p.join(1)

    if p.is_alive():
        p.terminate()
        print('killed worker')          
        print('shared object ' + str(shm.value))

    else:
        print('he was in time this worker')
        print('shared object ' + str(shm.value))
Ajay Srivastava
  • 1,151
  • 11
  • 15
0

In your code self.worker.work(self.args) in run is blocking until it finishes the whole loop. If you just terminate the process, the part sending back anything to the parent will never run.

Instead we need a way to let the process finish gracefully, so it can send back the object to the parent. worker.run is not allowed to block for that, so my code below wraps it in an extra thread. The main-thread in the child process starts this thread and runs a while-loop, checking for any message sent over a pipe and checking for the worker-thread being alive. This loop will break if your worker finishes naturally or the parent sends a "poison-pill". When this happens saving and sending can occur and the parent can .get() the instance.

import time
import logging
from threading import Thread
from multiprocessing import Process, Pipe


def init_logging(level=logging.DEBUG):
    fmt = '[%(asctime)s %(levelname)-8s %(processName)s' \
          ' %(funcName)s()] --- %(message)s'
    logging.basicConfig(format=fmt, level=level)


class Worker:

    def __init__(self, n):
        self.n = n

    def run(self):
        for i in range(int(self.n)):
            self.n -= 1
        return self

    def __str__(self):
        return f'{self.n}'

    def __repr__(self):
        return f'Worker(n={self.n})'


class MyProcess(Process):

    def __init__(self, n, log_level=logging.DEBUG):
        super().__init__()
        self.args = n
        self.log_level = log_level

        self.worker = None
        self.worker_thread = None
        self.parent_conn, self.child_conn = Pipe()

        logging.getLogger().debug('process instantiated')

    def run(self):
        init_logging(self.log_level)
        logging.getLogger().debug('process started')
        self.worker = Worker(self.args)
        self.worker_thread = Thread(target=self.worker.run)
        self.worker_thread.daemon = True
        self.worker_thread.start()

        while not self.child_conn.poll() and self.worker_thread.is_alive():
            self.worker_thread.join(0.5)  # heartbeat for checking
        self._save()

    def _save(self):
        """Send worker instance to parent."""
        logging.getLogger().debug('sending instance to parent')
        self.child_conn.send(self.worker)
        self.child_conn.close()

    def close(self):
        """Close process and receive result."""
        logging.getLogger().debug('closing process')
        # The actual value we are sending to child does not matter because
        # the while loop in `run` will break upon receipt of any object.
        self.parent_conn.send('POISON')

    def get(self):
        """Get result from child."""
        logging.getLogger().debug('get result from child')
        self.worker = self.parent_conn.recv()
        return self.worker

I tested this under Linux but with start_method set to 'spawn', default on Windows, so I expect it to run.

if __name__ == '__main__':

    init_logging()
    logger = logging.getLogger()

    p = MyProcess(100e6)  # try 10 vs 100e6 to toggle behaviour

    p.start()
    p.join(2)

    if p.is_alive():
        p.close()
        p.get()
        logger.info('killed worker')
        time.sleep(0.1)  # just to keep stdout in order
        print('shared object ' + repr(p.worker))

    else:
        p.get()
        logger.info('worker was in time')
        time.sleep(0.1)  # just to keep stdout in order
        print('shared object ' + repr(p.worker))

    assert isinstance(p.worker, Worker)
    p.join()

Example Output:

[2018-09-08 05:27:46,316 DEBUG    MainProcess __init__()] --- process instantiated
[2018-09-08 05:27:46,370 DEBUG    MyProcess-1 run()] --- process started
[2018-09-08 05:27:48,321 DEBUG    MainProcess close()] --- closing process
[2018-09-08 05:27:48,322 DEBUG    MainProcess get()] --- get result from child
[2018-09-08 05:27:48,396 DEBUG    MyProcess-1 _save()] --- sending instance to parent
[2018-09-08 05:27:48,402 INFO     MainProcess <module>()] --- killed worker
shared object Worker(n=82683682.0)

Process finished with exit code 0

Note worker.n counted down from 100M to 82.68M within 2 seconds before the .close()call.

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • So, calling `close()` and `get()` will kill the process? There is no need to call `terminate()` method? – Pavlo Sep 10 '18 at 08:53
  • @white space If you look at the code, `.close()` is sending a sentinel value and this will allow the process to finish gracefully. It's not recommended to `terminate()` a process if you can do it gracefully because it can can corrupt shared resources and cause deadlocks [see docs for more](https://docs.python.org/3.6/library/multiprocessing.html#multiprocessing.Event). – Darkonaut Sep 10 '18 at 12:57
  • @white space No you cannot use a process instead of a thread here, because this construct relies on the fact that the thread shares the same part of virtual memory as the main-thread in the child-process. It's way more cheaper to create a thread anyway so it wouldn't make to much sense to waste another process. – Darkonaut Sep 10 '18 at 12:58
  • @Darkounaut Will be there any problems in performance if inside the thread will be created a lot of different processes/threads? – Pavlo Sep 10 '18 at 17:26
  • @white space You cannot create a process _inside_ a thread, maybe [this](https://stackoverflow.com/a/52083532/9059420) helps to understand the concepts. If you create multiple worker threads in your child process, yes this will hurt performance because there will be no parallel execution for threads (if it's python-code and not GIL-releasing extensions like numpy or I/O). But you can just spawn multiple child processes the like I have shown you above to have parallel execution. Makes only sense up to the number of cpu-cores you have of course. – Darkonaut Sep 10 '18 at 17:38
  • Later I want to achieve true parallelism inside the worker because I have to make some heavy computations which can be done in parallel. Also, I have to take the advantage of all CPU-cores. So, it is obvious I have to use the `Process` instead of `Thread`. Is the idea similar for that task? – Pavlo Sep 11 '18 at 16:00
  • @white space I have the feeling you still understand something wrong. You have subclassed a process which will run one worker-thread, the main-thread in this child-process will only control this worker-thread and do nothing else. If you now need multiple workers to run something in parallel you just instantiate multiple of these subclassed processes from the answer, that's all. – Darkonaut Sep 11 '18 at 16:06
  • @Darkounaut I actually got confused by the statement that inside the `Thread` I can not create the `Process`. My idea was to create new `Processes` inside the method `run` of `Worker` class. It worked, at least I could run a simple example. See the modified code of your given example above [here](https://pastebin.com/cM3iB7ur). From your suggestion, I could instantiate multiple of subclassed processes, but I don't have to save data again before the stoping. So my question is: using this modified [example](https://pastebin.com/cM3iB7ur) is it possible to achieve a true parallelism in this case? – Pavlo Sep 12 '18 at 10:55
  • @white space You can only order the creation of a process from a thread. Processes each have their own memory space so they don't exist within threads, but you can hold a reference to them of course. Yes if you don't have data to save you can do that, processes will always run parallel as long there are enough cores for them. But your extension is overly complex without a real benefit. You could just take a normal process and pass your function to run. Have a look at pools in the multiprocessing module if you don't have not until now. – Darkonaut Sep 12 '18 at 13:17