2

I'm wondering how to implement a task-manager that restarts failed tasks in Python.

Here's what I've come up with after some thinking, but it seems like a hack. Is there a better way to achieve this "self-healing" task group pattern?

import asyncio, random

async def noreturn(_arg):
    while True:
        await asyncio.sleep(1)
        
        if random.randint(0, 10) % 10 == 0:
            raise random.choice((RuntimeError, ValueError, TimeoutError))


async def main():
    taskmap: dict[int, asyncio.Task] = {}

    for i in range(10):
        taskmap[i] = asyncio.create_task(noreturn(i))
    
    while True:
        for arg, task in taskmap.items():
            if task.done():
                # Task died
                taskmap[arg] = asyncio.create_task(noreturn(arg))

        await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

Thanks in advance for your help.

Joe Boyle
  • 169
  • 2
  • 7
  • essentially, you can just `try`- `except` -`pass` in `noreturn` coroutine. Whatever external re-running workaround would essentially do the same – RomanPerekhrest Mar 15 '23 at 17:20

2 Answers2

1

One solution might be to create a decorator, that reruns the function when it ends/throws exception:

import asyncio, random

def rerun_on_exception(f):
    async def _fn(*arg, **kwargs):
        while True:
            try:
                return await f(*arg, **kwargs)
            except:
                print('Reruning...')
    return _fn


@rerun_on_exception
async def noreturn(_arg):
    while True:
        await asyncio.sleep(1)

        if random.randint(0, 10) % 10 == 0:
            print(_arg)
            raise random.choice((RuntimeError, ValueError, TimeoutError))


async def main():
    taskmap: dict[int, asyncio.Task] = {}

    for i in range(10):
        taskmap[i] = asyncio.create_task(noreturn(i))

    # wait indefinitely:
    while True:
        await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())
jsbueno
  • 99,910
  • 10
  • 151
  • 209
Andrej Kesely
  • 168,389
  • 15
  • 48
  • 91
1

This will work as is, and if your core-thing is simple enough that this can fit in, just go as is.

The major problem with this approach is that it is "hardcoded" i.e.: your checking loop has all the information needed on the task that died, and can recreate it, by calling the same co-routine with the same parameters again.

In a larger system, one would expect you to have several mixed tasks, and not always have their initial parameters at hand to recreate a task when needed.

So, a pattern that could work better for you is to have an itermediate layer, which will hold the initial parameters to a task - or better yet, to its inner co-routine function, and then re-create the asyncio-task as needed whenever it fails.

This layer can be instrumented with observability (i.e. generate logs on fail, retry), retry attempts, retry interval, etc.. as you need.

You can inherit from asyncio.task, and write a wrapping code and set it as the task_factory in the running loop. Unfortunately, you won't be able to simply instantiate your classes as usual (asyncio.create_task), even customizing the task_factory because that takes a co-routine alreadt created - wherever you need to take note of your co-routine parameters so you can re-create the underlying co-routine in case of failure.

The code could be written along the example bellow. If this is critical for production, there may be edge cases not covered, and I'd advise you to contact a specialist to get you production-strength code for this. Nonetheless this should just work:

    
class RetrieableTask:   #(asyncio.Task):
    retiable = RuntimeError, ValueError, # ...
    
    def __init__(self, coroutine_function, args=(), kwargs=None, name=None, context=None, name=None,... ): # retry extrategies can be parametrized
        self.coroutine_function = coroutine_function
        self.args = args 
        self.kwargs = kwargs or {}
        self.context = context
        self.name = name
        self.start_task()
        
    def start_task(self):
        self.inner = asyncio.create_task(self.coroutine_function(*self.args, self.**kwargs)
                                         context=self.context)
        self.inner.set_name(self.name)
        
    
    def done(self):
        result = self.inner..done()
        if result:
            exception = self.inner.exception() # may raise CancelledError: just let it bubble through
            if exception and isinstance(exception, self.retriable):
                # if needed log, and check retry policies
                self.start_task()
                return False
    # bridge other task methods to the inner task as needed:
    def result(self):
        return self.inner.result()
    def exception(self):
        return self.inner.exception()
    def cancel(self, msg=None):
        return self.inner.cancel(msg)
    
    def set_name(self, name):
        self.name = name
        self.inner.set_name(name)
    def get_name(self):
        return self.name
    # repeat proxying of methods as needed for
    # the methods documented in https://docs.python.org/3/library/asyncio-task.html#task-object
    

And this is how it can be used:

async def noreturn(_arg):
    while True:
        await asyncio.sleep(1)

        if random.randint(0, 10) % 10 == 0:
            raise random.choice((RuntimeError, ValueError, TimeoutError))


async def main():
    tasks = []

    for i in range(10):
        tasks.append(RetriableTask(noreturn, args=(i,))

    while any(not task.done() for task in tasks):
        await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())
jsbueno
  • 99,910
  • 10
  • 151
  • 209