3

I have some resources which I have wrapped in a context manager class.

class Resource:
    def __init__(self, res):
        print(f'allocating resource {res}')
        self.res = res

    def __enter__(self):
        return self.res

    def __exit__(self, typ, value, traceback):
        print(f'freed resource {self.res}')

    def __str__(self):
        return f'{self.res}'

If I were to use 2 resources directly, I could use the following syntax:

with Resource('foo') as a, Resource('bar') as b:
    print(f'doing something with resource a({a})')
    print(f'doing something with resource b({b})')

This works as expected:

allocating resource foo
allocating resource bar
doing something with resource a(foo)
doing something with resource b(bar)
freed resource bar
freed resource foo

What I'd like to do, however, is wrap the use of these multiple resources into a class Task, and make that itself a context manager.

This is my first attempt at creating such a Task class which manages 2 resources:

class Task:
    def __init__(self, res1, res2):
        self.a = Resource(res1)
        self.b = Resource(res2)

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.b.__exit__(type, value, traceback)
        self.a.__exit__(type, value, traceback)

    def run(self):
        print(f'running task with resource {self.a} and {self.b}')

I can now use the familiar syntax:

with Task('foo', 'bar') as t:
    t.run()

And again this works as expected:

allocating resource foo
allocating resource bar
running task with resource foo and bar
freed resource bar
freed resource foo

This all works fine, except when an exception is thrown trying to free one of the resources.

To illustrate I have modified my Resource class to throw an exception for one of the resources:

class Resource:
    def __init__(self, res):
        print(f'allocating resource {res}')
        self.res = res

    def __enter__(self):
        return self.res

    def __exit__(self, typ, value, traceback):
        print(f'try free resource {self.res}')
        if self.res == 'bar':
            raise RuntimeError(f'error freeing {self.res} resource')
        print(f'freed resource {self.res}')

    def __str__(self):
        return f'{self.res}'

With the previous manual use of 2 resources:

try:
    with Resource('foo') as a, Resource('bar') as b:
        print(f'doing something with resource a({a})')
        print(f'doing something with resource b({b})')
except:
    pass

In the face of the exception freeing bar, foo is still freed:

allocating resource foo
allocating resource bar
doing something with resource a(foo)
doing something with resource b(bar)
try free resource bar
try free resource foo
freed resource foo

However, doing the same with Task, I leak the second resource:

try:
    with Task('foo', 'bar') as t:
        t.run()
except:
    pass

Output showing I never try free foo:

allocating resource foo
allocating resource bar
running task with resource foo and bar
try free resource bar

Questions:

  • I could wrap each explicit call to Resource.__exit__ in a try/except block, but then if I want to propagate the exception further up the call stack whilst still freeing up all the resources, I have to keep track of all exceptions and then rethrow them afterwards... it feels incorrect.

  • It also feels "dirty" calling Resource.__exit__ explicitly from Task.__exit__, rather than having the with statement call it implicitly for me. Is there a way to use a context manager inside a class like I'm trying to do?

What is the right way to handle multiple resources inside a single context manager?

Steve Lorimer
  • 27,059
  • 17
  • 118
  • 213

1 Answers1

4

As mentioned in the comments, ExitStack does exactly this.

A context manager that is designed to make it easy to programmatically combine other context managers

You can simply inherit from ExitStack and call enter_context for each resource you want managed:

class Task(contextlib.ExitStack):
    def __init__(self, res1, res2):
        super().__init__()
        self.a = self.enter_context(Resource(res1))
        self.b = self.enter_context(Resource(res2))

    def run(self):
        print(f'running task with resource {self.a} and {self.b}')

Note there is no need to define your own __enter__ and __exit__ functions, as ExitStack does that for us.

Using it as in the example:

try:
    with Task('foo', 'bar') as t:
        t.run()
except:
    pass

Now when the exception is thrown freeing bar, foo is still freed:

allocating resource foo
allocating resource bar
running task with resource foo and bar
try free resource bar
try free resource foo
freed resource foo
Steve Lorimer
  • 27,059
  • 17
  • 118
  • 213
  • I see one problem. What if the exception is raised in __init__ where some of the resources already cretaed/opened. `__exit__` will not be called. Possible solution [here](https://stackoverflow.com/a/51788906/13782669) – alex_noname Mar 01 '23 at 08:01