25

Python's futures package allows us to enjoy ThreadPoolExecutor and ProcessPoolExecutor for doing tasks in parallel.

However, for debugging it is sometimes useful to temporarily replace the true parallelism with a dummy one, which carries out the tasks in a serial way in the main thread, without spawning any threads or processes.

Is there anywhere an implementation of a DummyExecutor?

dano
  • 91,354
  • 19
  • 222
  • 219
Ram Rachum
  • 84,019
  • 84
  • 236
  • 374
  • @mata I don't think so, that would create one thread which will still be separate from the main thread. – Ram Rachum May 03 '12 at 17:13
  • of course you're right. but then it should't be too complicated to implement an `Executor` which on submit directly calls the callable and returns a `Future` object. A look a the [`ThreadPoolExecutor`](http://code.google.com/p/pythonfutures/source/browse/trunk/concurrent/futures/thread.py#98) might help – mata May 03 '12 at 17:34
  • 1
    It always looks simple before you do it, but not always after you do it. If someone already implemented this, it's much preferable that I use their ready implementation. – Ram Rachum May 03 '12 at 17:36

2 Answers2

24

Something like this should do it:

from concurrent.futures import Future, Executor
from threading import Lock


class DummyExecutor(Executor):

    def __init__(self):
        self._shutdown = False
        self._shutdownLock = Lock()

    def submit(self, fn, *args, **kwargs):
        with self._shutdownLock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = Future()
            try:
                result = fn(*args, **kwargs)
            except BaseException as e:
                f.set_exception(e)
            else:
                f.set_result(result)

            return f

    def shutdown(self, wait=True):
        with self._shutdownLock:
            self._shutdown = True


if __name__ == '__main__':

    def fnc(err):
        if err:
            raise Exception("test")
        else:
            return "ok"

    ex = DummyExecutor()
    print(ex.submit(fnc, True))
    print(ex.submit(fnc, False))
    ex.shutdown()
    ex.submit(fnc, True) # raises exception

locking is probably not needed in this case, but can't hurt to have it.

mata
  • 67,110
  • 10
  • 163
  • 162
4

Use this to mock your ThreadPoolExecutor

class MockThreadPoolExecutor():
    def __init__(self, **kwargs):
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        pass

    def submit(self, fn, *args, **kwargs):
        # execute functions in series without creating threads
        # for easier unit testing
        result = fn(*args, **kwargs)
        return result

    def shutdown(self, wait=True):
        pass

if __name__ == "__main__":
    def sum(a, b):
        return a + b

    with MockThreadPoolExecutor(max_workers=3) as executor:
        future_result = list()
        for i in range(5):
            future_result.append(executor.submit(sum, i + 1, i + 2))
Adhithiya Raghu
  • 145
  • 1
  • 4
  • This is a good solution, but I'm using this to test code then does something with the `Future` objects returned by `ThreadPoolExecutor.submit`. My solution was to also add a small `MockFuture` class with a `result` method, and have `submit` return an instance of that rather than the result directly. – Jack Brounstein May 06 '22 at 17:24