2

I have a set of long-running process in a typical "pub/sub" setup with queues for communication.

I would like to do two things, and I can't figure out how to accomplish both simultaneously:

  1. Addition/removal of workers. For example, I want to be able to add extra consumers if I see that my pending queue size has grown too large.
  2. Watchdog for my processes - I want to be notified if any of my producers or consumers crashes.

I can do (2) in isolation:

try:
    while True:
        for process in workers + consumers:
            if not process.is_alive():
                logger.critical("%-8s%s died!", process.pid, process.name)
        sleep(3)
except KeyboardInterrupt:
    # Python propagates CTRL+C to all workers, no need to terminate them
    logger.warn('Received CTR+C, shutting down')

The above blocks, which prevents me from doing (1).

So I decided to move the code into its own process.

This doesn't work, because process.is_alive() only works for a parent checking the status of its children. In this case, the processes I want to check would be siblings instead of children.

I'm a bit stumped on how to proceed. How can my main process support changes to subprocesses while also monitoring subprocesses?

knite
  • 6,033
  • 6
  • 38
  • 54
  • You cannot do that directly, at least not in a way that you can call "readable code that makes sense". To manage it you will need a level of abstraction that distributes jobs to workers who can receive commands to scale up/down. Quite frankly it is quite complex thing to write, and there are ready systems who do that, have a look at celery. – Tymoteusz Paul Oct 08 '14 at 01:12
  • @Puciek I've used Celery on other projects. It serves a different use case (AFAIK) - kicking off async jobs. I've never heard of using it to manage long-running producers and consumers. – knite Oct 08 '14 at 01:15
  • You can very well use it to launch long-running jobs, including consumer servers - all scripts were created equal in the end, just remember to disable the time-out. And it comes with the autoscale feature you seem to be looking for. – Tymoteusz Paul Oct 08 '14 at 01:17
  • 1
    @knite Why not just run the code in a background thread in the parent process? It spends most of its time sleeping, so the time it spends with the GIL shouldn't hurt performance of the main process in a noticeable way. – dano Oct 08 '14 at 01:23
  • @dano That's an interesting idea! I've been so focused on figuring out the tricky multiprocessing bits that I hadn't considered using a thread for the watchdog. – knite Oct 08 '14 at 01:25

1 Answers1

1

multiprocessing.Pool actually has a watchdog built-in already. It runs a thread that checks every 0.1 seconds to see if a worker has died. If it has, it starts a new one to take its place:

def _handle_workers(pool):
    thread = threading.current_thread()

    # Keep maintaining workers until the cache gets drained, unless the pool
    # is terminated.
    while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
        pool._maintain_pool()
        time.sleep(0.1)
    # send sentinel to stop workers
    pool._taskqueue.put(None)
    debug('worker handler exiting')

def _maintain_pool(self):
    """Clean up any exited workers and start replacements for them.
    """
    if self._join_exited_workers():
        self._repopulate_pool()

This is primarily used to implement the maxtasksperchild keyword argument, and is actually problematic in some cases. If a process dies while a map or apply command is running, and that process is in the middle of handling a task associated with that call, it will never finish. See this question for more information about that behavior.

That said, if you just want to know that a process has died, you can just create a thread (not a process) that monitors the pids of all the processes in the pool, and if the pids in the list ever change, you know a process has crashed:

def monitor_pids(pool):
    pids = [p.pid for p in pool._pool]
    while True:
      new_pids = [p.pid for p in pool._pool]
      if new_pids != pids:
          print("A worker died")
          pids = new_pids
      time.sleep(3)

Edit:

If you're rolling your own Pool implementation, you can just take a cue from multiprocessing.Pool, and run your monitoring code in a background thread in the parent process. The checks to see if the processes are still running are quick, so the time lost to the background thread taking the GIL should be negligible. Consider that the multiprocessing.Process watchdog is running every 0.1 seconds! Running yours every 3 seconds shouldn't cause any problems.

Community
  • 1
  • 1
dano
  • 91,354
  • 19
  • 222
  • 219
  • I'm not using a pool, because my producers aren't doing identical work. But I'll take a look at the source to see if I can borrow some of what it does - looks a bit tricky, however... – knite Oct 08 '14 at 01:18
  • @knite Well, you could still use a `Pool` for that. Not every process in a `multiprocessing.Pool` needs to be doing identical work. It would be tricky to get the auto-scaling you want with `multiprocessing.Pool`, though. – dano Oct 08 '14 at 01:22