6

I'm writing a program in C++. I've noticed that it's gaining a number of threads whose purpose is to do something at intervals, there are 3 or 4 of them. I decided to refactor by writing a scheduler service that the other places that use these threads could subscribe to, which should reduce the number of extra event threads I have running at any time to just one.

I don't have any code that uses this yet; before I start writing it I'd like to know if it's possible, and get some feedback on my design. A brief description of what I'd like to accomplish is this:

To add an event

  1. Caller provides an event and a schedule
  2. Schedule provides the next occurrence of the event
  3. (event, schedule) pair is added to an event queue
  4. interrupt sleeping event thread (i.e. wake it up)

The event thread main loop

  1. try to get the next event in the event queue
  2. If there is no pending event, go straight to 4
  3. Get the time that the next event is supposed to occur
  4. Sleep until next event (or forever if no waiting event)
  5. If sleeping was interrupted for any reason, loop back to 1
  6. If sleeping completed successfully, perform current event
  7. Update queue (remove event, re-insert if it's a repeating event)
  8. Jump back to 1

I've done a bit of research and know that it's possible to interrupt a sleeping thread, and I believe that as long as simultaneous access to the event queue is prevented, there shouldn't be any dangerous behavior. I'd imagine that waking a thread HAS to be possible, java's Thread's sleep() call throws an InterruptedException under some circumstances, and unless it doesn't rely on the operating system's underlying sleep call, it's got to be possible somehow.

Question

Can anyone comment on my approach? Is this a wheel I'd be better off not reinventing? How, specifically, can you interrupt a sleeping thread such that execution resumes at the next instruction, and is it possible to detect this from the interrupted thread?

A note about boost

I'd bet you can write a scheduler with boost, but this compiles and runs on a machine that's, for lack of a better phrase, a load of crap. I've compiled boost programs before on it, and each file that pulls boost in usually takes upwards of 30 seconds to compile. If I can avoid this irritating development obstacle, I'd very much like to.

Addendum - Working Code [Amended as per caf's suggestion]

This is the code I've produced that works. It's been rudimentarily tested but has properly handled both single and repeated events with varying delays.

Here's the event thread's body:

void Scheduler::RunEventLoop()
{
    QueueLock();                   // lock around queue access
    while (threadrunning)
    {
        SleepUntilNextEvent();     // wait for something to happen

        while (!eventqueue.empty() && e.Due())
        {                          // while pending due events exist
            Event e = eventqueue.top();
            eventqueue.pop();

            QueueUnlock();         // unlock
            e.DoEvent();           // perform the event
            QueueLock();           // lock around queue access

            e.Next();              // decrement repeat counter
                                   // reschedule event if necessary
            if (e.ShouldReschedule()) eventqueue.push(e);
        }
    }
    QueueUnlock();                 // unlock
    return;                        // if threadrunning is set to false, exit
}

Here's the sleep function:

void Scheduler::SleepUntilNextEvent()
{
    bool empty = eventqueue.empty();  // check if empty

    if (empty)
    {
        pthread_cond_wait(&eventclock, &queuelock); // wait forever if empty
    }
    else
    {
        timespec t =                  // get absolute time of wakeup
            Bottime::GetMillisAsTimespec(eventqueue.top().Countdown() + 
                                         Bottime::GetCurrentTimeMillis());
        pthread_cond_timedwait(&eventclock, &queuelock, &t); // sleep until event
    }
}

Finally, AddEvent:

void Scheduler::AddEvent(Event e)
{
    QueueLock();
    eventqueue.push(e);
    QueueUnlock();
    NotifyEventThread();
}

Relevant variable declarations:

bool threadrunning;
priority_queue<Event, vector<Event>, greater<Event> > eventqueue;
pthread_mutex_t queuelock; // QueueLock and QueueUnlock operate on this
pthread_cond_t eventclock;

To deal with the issue of generic events, each Event contains a pointer to an object of abstract type action, whos subclasses override action::DoEvent. this method is called from inside Event::DoEvent. actions are 'owned' by their events, i.e. they are automatically deleted if the event no longer needs to be rescheduled.

Wug
  • 12,956
  • 4
  • 34
  • 54
  • 1
    You wait for events, and not "sleep". You should read the pthreads documentation to see what kind of primitives it provides. – Karoly Horvath Aug 19 '12 at 07:46
  • This isn't an event handler, its a scheduler. It needs to get events, which sometime in the future need to happen. It's not about waiting for events, its about waiting ON events. – Wug Aug 19 '12 at 08:47
  • Your current solution contains race conditions - I've made a longer explanation and some suggested improvements in an additional answer. – caf Aug 22 '12 at 03:15
  • Does `NotifyEventThread()` use `pthread_cond_signal()`? – Craig McQueen May 08 '16 at 00:06

4 Answers4

10

What you are looking for is pthread_cond_t object, pthread_cond_timedwait and pthread_cond_wait functions. You could create conditional variable isThereAnyTaskToDo and wait on it in event thread. When new event is added, you just wake event thread with pthread_cond_signal().

Greg
  • 1,660
  • 13
  • 12
  • 4
    Careful there. You're describing an event variable. Condition variables are a little bit more subtle. An event variable says "something has happened", where as a condition variable signals "something *might* have changed." Condition variables are properly used only when paired with mutexes; waits on a condition variable can spuriously wake up. – Managu Aug 19 '12 at 08:02
  • Good point, but Wug already handles it, please see step 5 of his event thread description. – Greg Aug 19 '12 at 08:18
  • I can't use a condition object because something would have to wake it up when it was time for the event to run. – Wug Aug 19 '12 at 08:46
  • I'll look into it. I've used conditions, but never `timedwait` – Wug Aug 19 '12 at 08:51
  • I've produced (but not tested) a class that uses pthread_cond_timedwait. I'll be able to test it later today probably. – Wug Aug 20 '12 at 15:51
3

You have several possibilities both on *NIX platforms and on Windows. Your timer thread should wait using some kind of timed wait on event/conditional variable object. On POSIX platforms you can use pthread_cond_timedwait(). On Windows, you can either choose to compute the necessary time delta and use WaitForSingleObject() on event handle, or you could use combination of event object with CreateTimerQueueTimer() or CreateWaitableTimer(). Boost does also have some synchronization primitives that you could use to implement this with the POSIX-like primitives but portably.

UPDATE:

POSIX does have some timer functionality as well, see create_timer()

wilx
  • 17,697
  • 6
  • 59
  • 114
  • This is purely linux, I don't need to or intend to support windows platforms. I'll accept an answer when I actually implement this, but this is probably going to be the solution I choose. – Wug Aug 19 '12 at 08:48
  • On second thought, I don't think I'm going to use timers because I'll have to worry about deadlocks in otherwise safe synchronous code. I think. I'm pretty sure that if an asynchronous call gets made, and an IO lock is closed and the event handler tries to grab it (which is not beyond the realm of possibility), it will deadlock. – Wug Aug 20 '12 at 15:53
3

I agree with Greg and wilx - pthread_cond_timedwait() can be used to implement the behaviour you're after. I just wanted to add that you can simplify your event thread main loop:

  1. try to get the next event in the event queue
  2. If there is no pending event, go straight to 4
  3. Get the time that the next event is supposed to occur
  4. Wait on condition variable with pthread_cond_timedwait() until next event (or with pthread_cond_wait() if no scheduled events)
  5. Try to get the next event in the event queue
  6. If there are no events that have expired yet, go back to 4
  7. Update queue (remove event, re-insert if it's a repeating event)
  8. Jump back to 5

So you don't care why you woke up - whenever you wake up, you check the current time and run any events that have expired, then go back to waiting. In most cases when a new event is added you'll find that no events have expired, of course - you'll just recalculate the wait time.

You'll probably want to implement the queue as a Priority Queue, so that the next-event-to-expire is always at the front.

Community
  • 1
  • 1
caf
  • 233,326
  • 40
  • 323
  • 462
  • Yeah, that was the idea. I just wasn't sure how to go about the specifics of blocking the event thread. – Wug Aug 19 '12 at 11:32
1

Your current solution contains race conditions - for example, here:

QueueLock();                      // lock around queue access
bool empty = eventqueue.empty();  // check if empty
QueueUnlock();                    // unlock

pthread_mutex_lock(&eventmutex);  // lock event mutex (for condition)
if (empty)
{
    pthread_cond_wait(&eventclock, &eventmutex); // wait forever if empty
}

Consider what happens if the queue is initially empty, but another thread races with this and pushes a new value in between QueueUnlock() and the pthread_mutex_lock(&eventmutex) - the wakeup for the new event will be missed. Note also that in SleepUntilNextEvent() you access eventqueue.top() without holding the queue lock.

The mutex passed to pthread_cond_wait() is supposed to be the mutex protecting the shared state that the signal is relevant to. In this case that "shared state" is the queue itself, so you can fix these problems by using just one mutex protecting the queue:

void Scheduler::RunEventLoop()
{

    pthread_mutex_lock(&queuemutex);
    while (threadrunning)
    {
        while (!eventqueue.empty() && e.Due())
        {                          // while pending due events exist
            Event e = eventqueue.top();
            eventqueue.pop();

            pthread_mutex_unlock(&queuemutex);
            e.DoEvent();           // perform the event
            e.Next();              // decrement repeat counter
            pthread_mutex_lock(&queuemutex);
                                   // reschedule event if necessary
            if (e.ShouldReschedule()) eventqueue.push(e);
        }

        SleepUntilNextEvent();     // wait for something to happen
    }
    pthread_mutex_unlock(&queuemutex);

    return;                        // if threadrunning is set to false, exit
}

/* Note: Called with queuemutex held */
void Scheduler::SleepUntilNextEvent()
{
    if (eventqueue.empty())
    {
        pthread_cond_wait(&eventclock, &queuemutex); // wait forever if empty
    }
    else
    {
        timespec t =                  // get absolute time of wakeup
            Bottime::GetMillisAsTimespec(eventqueue.top().Countdown() + 
                                         Bottime::GetCurrentTimeMillis());
        pthread_cond_timedwait(&eventclock, &queuemutex, &t); // sleep until event
    }
}

Note that pthread_cond_wait() and pthread_cond_timedwait() release the mutex while they are waiting (the mutex is released and the wait begins atomically with respect to the mutex being signalled), so the Scheduler is not holding the mutex while it is sleeping.

caf
  • 233,326
  • 40
  • 323
  • 462
  • 1
    I just got done identifying a diabolical bug that arose when I stuck this into the rest of the code I was using. I'm going to ask a self-answered question later with that, so stay tuned. – Wug Aug 22 '12 at 04:45