35

Context

I recently posted a timer class for review on Code Review. I'd had a gut feeling there were concurrency bugs as I'd once seen 1 unit test fail, but was unable to reproduce the failure. Hence my post to code review.

I got some great feedback highlighting various race conditions in the code. (I thought) I understood the problem and the solution, but before making any fixes, I wanted to expose the bugs with a unit test. When I tried, I realised it was difficult. Various stack exchange answers suggested I'd have to control the execution of threads to expose the bug(s) and any contrived timing would not necessarily be portable to a different machine. This seemed like a lot of accidental complexity beyond the problem I was trying to solve.

Instead I tried using the best static analysis (SA) tool for python, PyLint, to see if it'd pick out any of the bugs, but it couldn't. Why could a human find the bugs through code review (essentially SA), but a SA tool could not?

Afraid of trying to get Valgrind working with python (which sounded like yak-shaving), I decided to have a bash at fixing the bugs without reproducing them first. Now I'm in a pickle.

Here's the code now.

from threading import Timer, Lock
from time import time

class NotRunningError(Exception): pass
class AlreadyRunningError(Exception): pass


class KitchenTimer(object):
    '''
    Loosely models a clockwork kitchen timer with the following differences:
        You can start the timer with arbitrary duration (e.g. 1.2 seconds).
        The timer calls back a given function when time's up.
        Querying the time remaining has 0.1 second accuracy.
    '''

    PRECISION_NUM_DECIMAL_PLACES = 1
    RUNNING = "RUNNING"
    STOPPED = "STOPPED"
    TIMEUP  = "TIMEUP"

    def __init__(self):
        self._stateLock = Lock()
        with self._stateLock:
            self._state = self.STOPPED
            self._timeRemaining = 0

    def start(self, duration=1, whenTimeup=None):
        '''
        Starts the timer to count down from the given duration and call whenTimeup when time's up.
        '''
        with self._stateLock:
            if self.isRunning():
                raise AlreadyRunningError
            else:
                self._state = self.RUNNING
                self.duration = duration
                self._userWhenTimeup = whenTimeup
                self._startTime = time()
                self._timer = Timer(duration, self._whenTimeup)
                self._timer.start()

    def stop(self):
        '''
        Stops the timer, preventing whenTimeup callback.
        '''
        with self._stateLock:
            if self.isRunning():
                self._timer.cancel()
                self._state = self.STOPPED
                self._timeRemaining = self.duration - self._elapsedTime()
            else:
                raise NotRunningError()

    def isRunning(self):
        return self._state == self.RUNNING

    def isStopped(self):
        return self._state == self.STOPPED

    def isTimeup(self):
        return self._state == self.TIMEUP

    @property
    def timeRemaining(self):
        if self.isRunning():
            self._timeRemaining = self.duration - self._elapsedTime()
        return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES)

    def _whenTimeup(self):
        with self._stateLock:
            self._state = self.TIMEUP
            self._timeRemaining = 0
            if callable(self._userWhenTimeup):
                self._userWhenTimeup()

    def _elapsedTime(self):
        return time() - self._startTime

Question

In the context of this code example, how can I expose the race conditions, fix them, and prove they're fixed?

Extra points

extra points for a testing framework suitable for other implementations and problems rather than specifically to this code.

Takeaway

My takeaway is that the technical solution to reproduce the identified race conditions is to control the synchronism of two threads to ensure they execute in the order that will expose a bug. The important point here is that they are already identified race conditions. The best way I've found to identify race conditions is to put your code up for code review and encourage more expert people analyse it.

Community
  • 1
  • 1
doughgle
  • 827
  • 1
  • 9
  • 18
  • 4
    `PyLint` knows nothing about threads - that's why it didn't help. In general, you're addressing *very hard* problems here. [Follow the references here](http://stackoverflow.com/questions/74391/proving-correctness-of-multithread-algorithms) and you'll discover they're not of much help :-( – Tim Peters Oct 26 '13 at 03:59
  • Very hard in general, doesn't mean impossible though, right? I'm looking for an answer specific to this example. So far the only way I managed to detect race conditions is through code review. But will it detect them reliably? And is there a faster way to find out if I've fixed (or introduced) a concurrency bug? – doughgle Oct 28 '13 at 02:52
  • The better way is to code defensively, in this case to synchronize the timeRemaining method as well, as @perreal suggests. As far as tests go, brute force is a quite solid option. – flup Nov 25 '13 at 10:59
  • Generally it's best to think it through and make sure that there are no race conditions since those are really hard to detect (may happen only one in a million times you run the program). – mb21 Nov 26 '13 at 22:05
  • Maybe `before_after` can help: http://www.oreills.co.uk/2015/03/01/testing-race-conditions-in-python.html – Jérôme Jul 18 '16 at 14:35

4 Answers4

11

Traditionally, forcing race conditions in multithreaded code is done with semaphores, so you can force a thread to wait until another thread has achieved some edge condition before continuing.

For example, your object has some code to check that start is not called if the object is already running. You could force this condition to make sure it behaves as expected by doing something like this:

  • starting a KitchenTimer
  • having the timer block on a semaphore while in the running state
  • starting the same timer in another thread
  • catching AlreadyRunningError

To do some of this you may need to extend the KitchenTimer class. Formal unit tests will often use mock objects which are defined to block at critical times. Mock objects are a bigger topic than I can address here, but googling "python mock object" will turn up a lot of documentation and many implementations to choose from.

Here's a way that you could force your code to throw AlreadyRunningError:

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def start(self, duration=1, whenTimeUp=None):
        KitchenTimer.start(self, duration, whenTimeUp)
        with self._runningLock:
            print "waiting on _runningLock"
            self._runningLock.wait()

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

timer = TestKitchenTimer()

# Start the timer in a subthread. This thread will block as soon as
# it is started.
thread_1 = threading.Thread(target = timer.start, args = (10, None))
thread_1.start()

# Attempt to start the timer in a second thread, causing it to throw
# an AlreadyRunningError.
try:
    thread_2 = threading.Thread(target = timer.start, args = (10, None))
    thread_2.start()
except AlreadyRunningError:
    print "AlreadyRunningError"
    timer.resume()
    timer.stop()

Reading through the code, identify some of the boundary conditions you want to test, then think about where you would need to pause the timer to force that condition to arise, and add Conditions, Semaphores, Events, etc. to make it happen. e.g. what happens if, just as the timer runs the whenTimeUp callback, another thread tries to stop it? You can force that condition by making the timer wait as soon as it's entered _whenTimeUp:

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def _whenTimeup(self):
        with self._runningLock:
            self._runningLock.wait()
        KitchenTimer._whenTimeup(self)

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

def TimeupCallback():
    print "TimeupCallback was called"

timer = TestKitchenTimer()

# The timer thread will block when the timer expires, but before the callback
# is invoked.
thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback))
thread_1.start()
sleep(2)

# The timer is now blocked. In the parent thread, we stop it.
timer.stop()
print "timer is stopped: %r" % timer.isStopped()

# Now allow the countdown thread to resume.
timer.resume()

Subclassing the class you want to test isn't an awesome way to instrument it for testing: you'll have to override basically all of the methods in order to test race conditions in each one, and at that point there's a good argument to be made that you're not really testing the original code. Instead, you may find it cleaner to put the semaphores right in the KitchenTimer object but initialized to None by default, and have your methods check if testRunningLock is not None: before acquiring or waiting on the lock. Then you can force races on the actual code that you're submitting.

Some reading on Python mock frameworks that may be helpful. In fact, I'm not sure that mocks would be helpful in testing this code: it's almost entirely self-contained and doesn't rely on many external objects. But mock tutorials sometimes touch on issues like these. I haven't used any of these, but the documentation on these like a good place to get started:

Tim Pierce
  • 5,514
  • 1
  • 15
  • 31
  • I can force the code to throw AlreadyRunningError just by calling start twice in quick succession. – doughgle Nov 27 '13 at 09:52
  • Can you give an example to reproduce "what happens if, just as the timer runs the whenTimeUp callback, another thread tries to stop it?" That's one of the race conditions originally identified in the code review and one that I'm struggling to reproduce. – doughgle Nov 27 '13 at 09:54
  • 1
    "in quick succession" approach is no different from the brute force approach of "running many threads". Since it relies in the chance that two close-by statements are executed in a very short delay in between. I agree that in general you can't really produce general race condition. If analytically you have discovered that it might encounter a race condition, then it will be. There is no need to test (since you are already testing it in your head). – justhalf Nov 27 '13 at 09:57
  • @justhalf sorry for the confusion caused by "quick succession". I mean, if I start() one KitchenTimer with 10 second duration and I try to start() it again within that duration, I should get AlreadyRunningError. This is the external behaviour of the timer. It should happen regardless of whether start() is called in the same or different thread. – doughgle Nov 27 '13 at 10:04
  • Sure, just edited the answer to include an example for exposing the race condition between _whenTimeup() and stop(). Also fixed a bug in my original example, oops. – Tim Pierce Nov 27 '13 at 14:02
  • As for the "quick succession" point: you're right that just starting two threads one immediately after the other will almost always invoke the error, but a strict code reviewer would point out that you have no _guarantee_ that the second thread started within 10 seconds of the first. The semaphores allow you to provide that guarantee. A really diligent test would not even use `sys.sleep` as I did here, but would use another semaphore to have the parent thread wait until the timer thread entered its critical section. – Tim Pierce Nov 27 '13 at 14:03
9

The most common solution to testing thread (un)safe code is to start a lot of threads and hope for the best. The problem I, and I can imagine others, have with this is that it relies on chance and it makes tests 'heavy'.

As I ran into this a while ago I wanted to go for precision instead of brute force. The result is a piece of test code to cause race-conditions by letting the threads race neck to neck.

Sample racey code

spam = []

def set_spam():
    spam[:] = foo()
    use(spam)

If set_spam is called from several threads, a race condition exists between modification and use of spam. Let's try to reproduce it consistently.

How to cause race-conditions

class TriggeredThread(threading.Thread):
    def __init__(self, sequence=None, *args, **kwargs):
        self.sequence = sequence
        self.lock = threading.Condition()
        self.event = threading.Event()
        threading.Thread.__init__(self, *args, **kwargs)

    def __enter__(self):
        self.lock.acquire()
        while not self.event.is_set():
            self.lock.wait()
        self.event.clear()

    def __exit__(self, *args):
        self.lock.release()
        if self.sequence:
            next(self.sequence).trigger()

    def trigger(self):
        with self.lock:
            self.event.set()
            self.lock.notify()

Then to demonstrate the use of this thread:

spam = []  # Use a list to share values across threads.
results = []  # Register the results.

def set_spam():
    thread = threading.current_thread()
    with thread:  # Acquires the lock.
        # Set 'spam' to thread name
        spam[:] = [thread.name]
    # Thread 'releases' the lock upon exiting the context.
    # The next thread is triggered and this thread waits for a trigger.
    with thread:
        # Since each thread overwrites the content of the 'spam'
        # list, this should only result in True for the last thread.
        results.append(spam == [thread.name])

threads = [
    TriggeredThread(name='a', target=set_spam),
    TriggeredThread(name='b', target=set_spam),
    TriggeredThread(name='c', target=set_spam)]

# Create a shifted sequence of threads and share it among the threads.
thread_sequence = itertools.cycle(threads[1:] + threads[:1])
for thread in threads:
    thread.sequence = thread_sequence

# Start each thread
[thread.start() for thread in threads]
# Trigger first thread.
# That thread will trigger the next thread, and so on.
threads[0].trigger()
# Wait for each thread to finish.
[thread.join() for thread in threads]
# The last thread 'has won the race' overwriting the value
# for 'spam', thus [False, False, True].
# If set_spam were thread-safe, all results would be true.
assert results == [False, False, True], "race condition triggered"
assert results == [True, True, True], "code is thread-safe"

I think I explained enough about this construction so you can implement it for your own situation. I think this fits the 'extra points' section quite nicely:

extra points for a testing framework suitable for other implementations and problems rather than specifically to this code.

Solving race-conditions

Shared variables

Each threading issue is solved in it's own specific way. In the example above I caused a race-condition by sharing a value across threads. Similar problems can occur when using global variables, such as a module attribute. The key to solving such issues may be to use a thread-local storage:

# The thread local storage is a global.
# This may seem weird at first, but it isn't actually shared among threads.
data = threading.local()
data.spam = []  # This list only exists in this thread.
results = []  # Results *are* shared though.

def set_spam():
    thread = threading.current_thread()
    # 'get' or set the 'spam' list. This actually creates a new list.
    # If the list was shared among threads this would cause a race-condition.
    data.spam = getattr(data, 'spam', [])
    with thread:
        data.spam[:] = [thread.name]
    with thread:
        results.append(data.spam == [thread.name])

# Start the threads as in the example above.

assert all(results)  # All results should be True.

Concurrent reads/writes

A common threading issue is the problem of multiple threads reading and/or writing to a data holder concurrently. This problem is solved by implementing a read-write lock. The actual implementation of a read-write lock may differ. You may choose a read-first lock, a write-first lock or just at random.

I'm sure there are examples out there describing such locking techniques. I may write an example later as this is quite a long answer already. ;-)

Notes

Have a look at the threading module documentation and experiment with it a bit. As each threading issue is different, different solutions apply.

While on the subject of threading, have a look at the Python GIL (Global Interpreter Lock). It is important to note that threading may not actually be the best approach in optimizing performance (but this is not your goal). I found this presentation pretty good: https://www.youtube.com/watch?v=zEaosS1U5qY

Dima Tisnek
  • 11,241
  • 4
  • 68
  • 120
siebz0r
  • 18,867
  • 14
  • 64
  • 107
  • I like your hack to cause specific threaded operation order... let me test this locally and see if I get anywhere with that! – Dima Tisnek Nov 27 '13 at 22:02
  • however, having ran your example some 2K times, I didn't get a single failure... perhaps the code in your answer is synchronised a little too well? – Dima Tisnek Nov 27 '13 at 22:48
  • @qarma If you're talking about the `assert results == [False, False, True]`; this is an assertion of the caused failure. When the code if thread-safe it should only contain `True`. The problem lies within the shared variable `spam`. The thread name is stored in it and is changed by another thread. When you run the code a million times, it will **always** produce the same result, thus effectively and reliably causing a 'race-condition'. – siebz0r Nov 28 '13 at 06:19
  • ah! okay I accept that, I wish you started with an occasionally racey example and then showed how to trigger race reliably. Doesn't help identifying the race in general sense though, as you have to manually insert another `with` between the two points of a race. sometimes it's hard to do, e.g. expression `foo[x] += 1` is racey, but you can't inject `with` into it without a rewrite. – Dima Tisnek Nov 28 '13 at 10:28
  • @qarma Yea, that's sort of my main issue with my code. The `with` statements are needed so the thread can trigger another thread. I have looked into how to run a thread line by line (similar to debugger 'steps') but couldn't wrap my head around it. The `with` statements do provide the developer some sense of flexibility to make a thread start/stop at a given point. – siebz0r Nov 28 '13 at 12:22
  • Video Link Broken :( – babygame0ver May 10 '21 at 18:23
4

You can test it by using a lot of threads:

import sys, random, thread
def timeup():
    sys.stdout.write("Timer:: Up %f" % time())

def trdfunc(kt, tid):
    while True :
        sleep(1)
        if not kt.isRunning():
            if kt.start(1, timeup):
                sys.stdout.write("[%d]: started\n" % tid)
        else:
            if random.random() < 0.1:
                kt.stop()
                sys.stdout.write("[%d]: stopped\n" % tid)
        sys.stdout.write("[%d] remains %f\n" % ( tid, kt.timeRemaining))

kt = KitchenTimer()
kt.start(1, timeup)
for i in range(1, 100):
    thread.start_new_thread ( trdfunc, (kt, i) )
trdfunc(kt, 0)

A couple of problem problems I see:

  • When a thread sees the timer as not running and try to start it, the code generally raises an exception due to context switch in between test and start. I think raising an exception is too much. Or you can have an atomic testAndStart function

  • A similar problem occurs with stop. You can implement a testAndStop function.

  • Even this code from the timeRemaining function:

    if self.isRunning():
       self._timeRemaining = self.duration - self._elapsedTime()
    

    Needs some sort of atomicity, perhaps you need to grab a lock before testing isRunning.

If you plan to share this class between threads, you need to address these issues.

perreal
  • 94,503
  • 21
  • 155
  • 181
  • Your code appears to rely on brute force and random, how does that reproduce any problem or this specific problem *reliably*? – Dima Tisnek Nov 21 '13 at 13:04
  • 1
    @qarma, I think determinism, in this case, is a matter of calculating probabilities and setting number of threads and timer interval accordingly. – perreal Nov 21 '13 at 13:48
  • I've tried running your test code, but I can't make sense of the results. What are the expected results? How can I compare them to the actual results? For bonus points, can you update your example to summarize the expected/actual results for us? – doughgle Nov 26 '13 at 02:56
  • 3
    Throwing a lot of threads at an object is a fine way to stress-test a system, but it's absolutely not a substitute for deliberately testing known edge conditions in a deterministic way. This is definitely not a solution to the problem at hand. – Tim Pierce Nov 27 '13 at 06:51
3

In general - this is not viable solution. You can reproduce this race condition by using debugger (set breakpoints in some locations in the code, than, when it hits one of the breakpoints - freeze the thread and run the code until it hits another breakpoint, then freeze this thread and unfreeze the first thread, you can interleave threads execution in any way using this technique).

The problem is - the more threads and code you have, the more ways to interleave side effects they will have. Actually - it will grow exponentially. There is no viable solution to test it in general. It is possible only in some simple cases.

The solution to this problem are well known. Write code that is aware of it's side effects, control side effects with synchronisation primitives like locks, semaphores or queues or use immutable data if its possible.

Maybe more practical way is to use runtime checks to force correct call order. For example (pseudocode):

class RacyObject:
    def __init__(self):
        self.__cnt = 0
        ...

    def isReadyAndLocked(self):
        acquire_object_lock
            if self.__cnt % 2 != 0:
                # another thread is ready to start the Job
                return False
            if self.__is_ready:
                self.__cnt += 1
                return True
            # Job is in progress or doesn't ready yet
            return False
        release_object_lock

    def doJobAndRelease(self):
        acquire_object_lock
            if self.__cnt % 2 != 1:
                raise RaceConditionDetected("Incorrect order")
            self.__cnt += 1
            do_job()
        release_object_lock

This code will throw exception if you doesn't check isReadyAndLock before calling doJobAndRelease. This can be tested easily using only one thread.

obj = RacyObject()
...
# correct usage
if obj.isReadyAndLocked()
    obj.doJobAndRelease()
Evgeny Lazin
  • 9,193
  • 6
  • 47
  • 83
  • Not true at all. Rigorous unit tests routinely expose race conditions by instrumenting the objects to wait until another thread has entered a critical section. – Tim Pierce Nov 27 '13 at 14:08
  • You can't perform this check reliably in practice because this would depend on context switching. I didn't saw any project that checks locking correctness in unit-tests. The only viable solution to this problem is dynamic checks, there is a bunch of race detectors for different languages and operating systems (golang, for example, has one built in) but I didn't know one for python. – Evgeny Lazin Nov 27 '13 at 14:13
  • 1
    Sure you can do it in practice. That's what the threading.Condition, threading.Semaphore and threading.Event classes are for: controlling thread execution to force your code into exercising edge conditions. I gave examples in my answer. – Tim Pierce Nov 27 '13 at 14:16
  • In your example it is possible that thread_2 will start first and thread_1 - second. At least if it will run on multiprocessor/multicore machine and each thread will run on different processor or CPU core. But maybe in python this wont be an issue because of GIL, I don't know for sure. – Evgeny Lazin Nov 27 '13 at 14:26
  • My point is - you can't test for race conditions in practice because number of cases will grow exponentially. I admit that in some cases it is possible to unit-test. – Evgeny Lazin Nov 27 '13 at 14:29
  • 1
    You're right! That's why I noted in my comment that a rigorous unit test would use another semaphore to ensure that the first thread entered its critical section before the second thread started. :-) I don't really agree that you have to check for an exponential number of interactions between massive numbers of threads. Almost all race conditions can be identified as an interaction between two competing processes -- if you have hundreds and hundreds of threads, it's more _likely_ that an unprotected race will arise, but it'll be a race between two of those threads. – Tim Pierce Nov 27 '13 at 14:39
  • And because of that - it is not practical. To many boilerplate code that test only one thing - it tests that lock is acquired. Is it possible to test a condition variable using this approach, for example? :) – Evgeny Lazin Nov 27 '13 at 14:45