22

I am using a Python 3 sequence like this:

lock = threading.Lock()
res = lock.acquire(timeout=10)
if res:
    # do something ....
    lock.release()
else:
    # do something else ...

I would prefer to use a with-statement instead of explicit "acquire" and "release", but I don't know how to get the timeout effect.

Zero Piraeus
  • 56,143
  • 27
  • 150
  • 160
Tsf
  • 1,339
  • 5
  • 17
  • 29

3 Answers3

23

You can do this pretty easily with a context manager:

import threading
from contextlib import contextmanager

@contextmanager
def acquire_timeout(lock, timeout):
    result = lock.acquire(timeout=timeout)
    try:
        yield result
    finally:
        if result:
            lock.release()


# Usage:
lock = threading.Lock()

with acquire_timeout(lock, 2) as acquired:
    if acquired:
        print('got the lock')
        # do something ....
    else:
        print('timeout: lock not available')
        # do something else ...

*Note: This won't work in Python 2.x since there isn't a timeout argument to Lock.acquire

robbles
  • 2,729
  • 1
  • 23
  • 30
  • this is working for me, thanks. But can you explain me why is it neccesary to use `yield` when using `with` in this case? –  Feb 21 '19 at 21:03
  • 2
    @monkjuice Inside a context manager, the `yield` statement is where control is passed back to the `with` block provided by the caller. So `yield result` puts the value of `result` into the `acquired` variable and runs the indented block below `with acquire_timeout...` and continues inside the context manager once it returns. – robbles Feb 23 '19 at 00:45
  • 1
    The `acquire_timeout` function in this answer is wrong; it should release the lock in a `finally` block with the `yield result` in the `try`. That way the lock will still be released when an exception is raised out of the code that runs in the context manager. – Christopher Armstrong May 17 '22 at 20:04
14

Slightly nicer version:

import threading
from contextlib import contextmanager


class TimeoutLock(object):
    def __init__(self):
        self._lock = threading.Lock()

    def acquire(self, blocking=True, timeout=-1):
        return self._lock.acquire(blocking, timeout)

    @contextmanager
    def acquire_timeout(self, timeout):
        result = self._lock.acquire(timeout=timeout)
        yield result
        if result:
            self._lock.release()

    def release(self):
        self._lock.release()

# Usage:
lock = TimeoutLock()

with lock.acquire_timeout(3) as result:
    if result:
        print('got the lock')
        # do something ....
    else:
        print('timeout: lock not available')
        # do something else ...

It appears you can't subclass threading.Lock, so I had to make a wrapper class instead.

robbles
  • 2,729
  • 1
  • 23
  • 30
0

This is @robble's code. I just added usage/example:

from datetime import datetime
import time
from queue import Queue
from threading import Thread
    
def _log(msg : str):
    print(f"{datetime.utcnow()} {msg}")

import threading
from contextlib import contextmanager
from typing import TypeVar

class TimeoutLock(object):
    def __init__(self, timeout_sec = -1):
        self._lock = threading.Lock()
        self.timeout_sec = timeout_sec

    @contextmanager
    def acquire_timeout(self):
        result = self._lock.acquire(timeout=self.timeout_sec)
        yield result
        if result:
            self._lock.release()

def producer(name, q, delay_sec):
    try:
        i : int = 0
        while True:
            q.put(i)
            _log(f"{name} {i}")
            time.sleep(delay_sec)
            i = i + 1

    except Exception as e:
        err_msg = f"{name} error: {str(e)}"
        _log(err_msg)
        raise

def consumer(name, q, lock, delay_sec):
    while True:
        with lock.acquire_timeout() as acquired:
            if acquired:
                i = q.get()
                _log(f'{name} {i}')
                time.sleep(delay_sec)
            else:
                _log(f"{name} wait timeout'ed")

try:
    q = Queue()
    lock = TimeoutLock(timeout_sec=3)

    consumer1_thread = Thread(target = consumer, args =('consumer1', q, lock, 5 ))
    consumer2_thread = Thread(target = consumer, args =('consumer2', q, lock, 5 ))
    producer1_thread = Thread(target = producer, args =('producer1', q, 1 ))
    producer1_thread.start()
    consumer1_thread.start()
    time.sleep(5)
    consumer2_thread.start()
    

    q.join()
except Exception as e:
        err_msg = f"main thread error: {str(e)}"
        _log(err_msg)
finally:
    _log(f'main thread done!')
user3761555
  • 851
  • 10
  • 21