0

I am developping a BaseServer abstract class:

class BaseServer(abc.ABC):

    @abc.abstractmethod
    def serve(self):
        raise NotImplementedError

    @abc.abstractmethod
    def shutdown(self):
        raise NotImplementedError

    @abc.abstractmethod
    def send(self, *args, **kwargs):
        raise NotImplementedError

    @abc.abstractmethod
    def finalise(self, *args, **kwargs):
        raise NotImplementedError

    @abc.abstractmethod
    def is_serving(self):
        raise NotImplementedError

    def set_handler(self, handler):
        self.__handler = handler

    def _handle(self, *args, **kwargs):
        try:
            self.__handler.handle(self, *args, **kwargs)
        except AttributeError:
            pass

    def _cleanup(self):
        pass

as well as ThreadPoolMixin and ProcessPoolMixin mixin classes (inspired from Python's socketserver module):

class ThreadPoolMixin:

    def _handle(self, *args, **kwargs):
        try:
            future = self.__pool.submit(super()._handle, *args, **kwargs)
            self.__futures.append(future)
        except AttributeError:
            self.__pool = concurrent.futures.ThreadPoolExecutor(1)
            self.__futures = []
            future = self.__pool.submit(super()._handle, *args, **kwargs)
            self.__futures.append(future)

    def _cleanup(self):
        try:
            for future in self.__futures:
                future.cancel()

            self.__pool.shutdown()
        except AttributeError:
            pass


class ProcessPoolMixin:

    def _handle(self, *args, **kwargs):
        try:
            future = self.__pool.submit(super()._handle, *args, **kwargs)
            self.__futures.append(future)
        except AttributeError:
            self.__pool = concurrent.futures.ProcessPoolExecutor(1)
            self.__futures = []
            future = self.__pool.submit(super()._handle, *args, **kwargs)
            self.__futures.append(future)

    def _cleanup(self):
        try:
            for future in self.__futures:
                future.cancel()

            self.__pool.shutdown()
        except AttributeError:
            pass

    def __getstate__(self):
        class Temp:
            pass

        state = self.__dict__.copy()
        temp = Temp()
        temp.__dict__ = state

        try:
            del temp.__pool
            del temp.__futures
        except AttributeError:
            pass

        return temp.__dict__

These classes allow me to create concrete subclasses according to my needs:

class HTTPServer(BaseServer):
    pass


class AMQPServer(BaseServer):
    pass


class ThreadingHTTPServer(ThreadPoolMixin, BaseServer):
    pass


class ThreadingAMQPServer(ThreadPoolMixin, BaseServer):
    pass


class ProcessingHTTPServer(ProcessPoolMixin, BaseServer):
    pass


class ProcessingAMQPServer(ProcessPoolMixin, BaseServer):
    pass

How can I customize the thread pool size and process pool size from these subclasses?

Géry Ogam
  • 6,336
  • 4
  • 38
  • 67

2 Answers2

1

I have figured out a solution: using a public class attribute, like in socketserver.ThreadingMixIn and socketserver.ForkingMixIn.

class ThreadPoolMixin:
    pool_size = 1

    def _handle(self, *args, **kwargs):
        try:
            future = self.__pool.submit(super()._handle, *args, **kwargs)
            self.__futures.append(future)
        except AttributeError:
            self.__pool = concurrent.futures.ThreadPoolExecutor(
                self.pool_size)
            self.__futures = []
            future = self.__pool.submit(super()._handle, *args, **kwargs)
            self.__futures.append(future)

    def _cleanup(self):
        try:
            for future in self.__futures:
                future.cancel()

            self.__pool.shutdown()
        except AttributeError:
            pass


class ThreadingHTTPServer(ThreadPoolMixin, BaseServer):
    pool_size = 4


class ThreadingAMQPServer(ThreadPoolMixin, BaseServer):
    pool_size = 2
Géry Ogam
  • 6,336
  • 4
  • 38
  • 67
1

Another way I'm experimenting with is a "mixin factory" that creates new mixin classes as needed:

def thread_pool_mixin(pool_size: int) -> Type:
    class ThreadPoolMixin:
        def _handle(self, *args, **kwargs):
            . . .
            # Use the closed-over "pool_size"
            self.__pool = concurrent.futures.ThreadPoolExecutor(pool_size)
            . . .
    return ThreadPoolMixin


class ThreadingHTTPServer(thread_pool_mixin(pool_size=4), BaseServer):
    pass

This seems cleaner than muddying the class attributes; although that seems to be the common approach.

Carcigenicate
  • 43,494
  • 9
  • 68
  • 117
  • Nice alternative, thanks. I don’t know if closure variables are cleaner than class variables, do you have any references where this approach is used? – Géry Ogam Apr 08 '22 at 21:30
  • 1
    @Maggyero Nope. I think this is cleaner though because Django's habit of "dump everything as public class variables" with mixins seems like if abused long enough, it would lead to naming clashes. Closures avoid namespace pollution, which I think is a valid reason to use this approach. If the data shouldn't be relied on by any other code, it doesn't seem like a good idea to even allow that. I am curious what the consensus on this is though. – Carcigenicate Apr 08 '22 at 21:58
  • I think that we are defining a [dependent type](https://stackoverflow.com/a/9374698/2326961). – Géry Ogam Apr 08 '22 at 23:05