0

I'm currently working on a project that involves three components,

an observer that check for changes in a directory, a worker and an command line interface.

What I want to achieve is:

  • The observer, when a change happens send a string to the worker (add a job to the worker's queue).

  • The worker has a queue of jobs and forever works on his queue.

  • Now I want the possibility to run a python script to check the status of the worker (number of active jobs, errors and so on)

I don't know how to achieve this with python in terms of which component to use and how to link the three components.

I though as a singleton worker where the observer add a job to a queue but 1) I was not able to write a working code and 2) How can I fit the checker in?

Another solution that I thought of may be multiple child processes from a father that has the queue but I'm a bit lost...

Thanks for any advices

jack87
  • 465
  • 2
  • 5
  • 13

1 Answers1

2

I'd use some kind of observer pattern or publish-subscribe pattern. For the former you can use for example the Python version of ReactiveX. But for a more basic example let's stay with the Python core. Parts of your program can subscribe to the worker and receive updates from the process via queues for example.

import itertools as it
from queue import Queue
from threading import Thread
import time


class Observable(Thread):
    def __init__(self):
        super().__init__()
        self._observers = []

    def notify(self, msg):
        for obs in self._observers:
            obs.put(msg)

    def subscribe(self, obs):
        self._observers.append(obs)


class Observer(Thread):
    def __init__(self):
        super().__init__()
        self.updates = Queue()


class Watcher(Observable):
    def run(self):
        for i in it.count():
            self.notify(i)
            time.sleep(1)


class Worker(Observable, Observer):
    def run(self):
        while True:
            task = self.updates.get()
            self.notify((str(task), 'start'))
            time.sleep(1)
            self.notify((str(task), 'stop'))


class Supervisor(Observer):
    def __init__(self):
        super().__init__()
        self._statuses = {}

    def run(self):
        while True:
            status = self.updates.get()
            print(status)
            self._statuses[status[0]] = status[1]
            # Do something based on status updates.
            if status[1] == 'stop':
                del self._statuses[status[0]]


watcher = Watcher()
worker = Worker()
supervisor = Supervisor()

watcher.subscribe(worker.updates)
worker.subscribe(supervisor.updates)

supervisor.start()
worker.start()
watcher.start()

However many variations are possible and you can check the various patterns which suits you most.

a_guest
  • 34,165
  • 12
  • 64
  • 118
  • Seems a really good starting point for me, from what I understand the Worker is the one responsible of consuming the queue, while the watcher notifies the worker when something happens. Now, what is the job of the supervisor? Can I have this in different python files and launch them in three different terminal sessions? And again how can I add for example another python script that I can launch manually to check on the status of the Queue ? – jack87 Jul 10 '18 at 07:17
  • Yes so the watcher is basically the one distributing jobs to the registered workers. Every worker consumes its own queue. The job of the supervisor is to provide regular status updates about the worker(s). This is all started from the same script. If you like to launch the supervisor in a separate script in order to observe the workers, then you'd have too look into [inter-process communication tools](https://docs.python.org/2/library/ipc.html) (e.g. via sockets, see [this question](https://stackoverflow.com/q/6920858/3767239); `ZeroMQ` is also a very nice framework). – a_guest Jul 10 '18 at 07:43