2

I have a state machine implementation in a library which runs on Linux. The main loop of the program is to simply wait until enough time has passed to require the next execution of the state machine.

At them moment I have a loop which is similar to the following psuedo-code:

while( 1 )
{
    while( StateTicks() > 0 )
        StateMachine();

    Pause( 10ms );
}

Where StateTicks may return a tick every 50ms or so. The shorter I make the Pause() the more CPU time I use in the program.

Is there a better way to test for a period of time passing, perhaps based on Signals? I'd rather halt execution until StateTicks() is > 0 rather than have the Pause() call at all.

Underneath the hood of the state machine implementation StateTicks uses clock_gettime(PFT_CLOCK ...) which works well. I'm keen to keep that timekeeping because if a StateMachine() call takes longer than a state machine tick this implementation will catchup.

Pause uses nanosleep to achieve a reasonably accurate pause time.

Perhaps this is already the best way, but it doesn't seem particularly graceful.

Brian Sidebotham
  • 1,706
  • 11
  • 15

2 Answers2

5

Create a periodic timer using timer_create(), and have it call sem_post() on a "timer tick semaphore".

To avoid losing ticks, I recommend using a real-time signal, perhaps SIGRTMIN+0 or SIGRTMAX-0. sem_post() is async-signal-safe, so you can safely use it in a signal handler.

Your state machine simply waits on the semaphore; no other timekeeping needed. If you take too long to process a tick, the following sem_wait() will not block, but return immediately. Essentially, the semaphore counts "lost" ticks.

Example code (untested!):

#define  _POSIX_C_SOURCE 200809L
#include <semaphore.h>
#include <signal.h>
#include <errno.h>
#include <time.h>

#define   TICK_SIGNAL (SIGRTMIN+0)

static timer_t tick_timer;
static sem_t   tick_semaphore;

static void tick_handler(int signum, siginfo_t *info, void *context)
{
    if (info && info->si_code == SI_TIMER) {
        const int saved_errno = errno;
        sem_post((sem_t *)info->si_value.sival_ptr);
        errno = saved_errno;
    }
}

static int tick_setup(const struct timespec interval)
{
    struct sigaction  act;
    struct sigevent   evt;
    struct itimerspec spec;

    if (sem_init(&tick_semaphore, 0, 0))
        return errno;

    sigemptyset(&act.sa_mask);
    act.sa_handler = tick_handler;
    act.sa_flags = 0;
    if (sigaction(TICK_SIGNAL, &act, NULL))
        return errno;

    evt.sigev_notify = SIGEV_SIGNAL;
    evt.sigev_signo = TICK_SIGNAL;
    evt.sigev_value.sival_ptr = &tick_semaphore;
    if (timer_create(CLOCK_MONOTONIC, &evt, &tick_timer))
        return errno;

    spec.it_interval = interval;
    spec.it_value = interval;
    if (timer_settime(tick_timer, 0, &spec, NULL))
        return errno;

    return 0;
}

with the tick loop being simply

    if (tick_setup(some_interval))
        /* failed, see errno; abort */

    while (!sem_wait(&tick_semaphore)) {

        /* process tick */

    }

If you support more than one concurrent state, the one signal handler suffices. Your state typically would include

    timer_t          timer;
    sem_t            semaphore;
    struct timespec  interval;

and the only tricky thing is to make sure there is no pending timer signal when destroying the state that signal would access.

Because signal delivery will interrupt any blocking I/O in the thread used for the signal delivery, you might wish to set up a special thread in your library to handle the timer tick realtime signals, with the realtime signal blocked in all other threads. You can mark your library initialization function __attribute__((constructor)), so that it is automatically executed prior to main().

Optimally, you should use the same thread that does the tick processing for the signal delivery. Otherwise there will be some small jitter or latency in the tick processing, if the signal was delivered using a different CPU core than the one that is running the tick processing.


Basile Starynkevitch's answer jogged my memory about the latencies involved in waiting and signal delivery: If you use nanosleep() and clock_gettime(CLOCK_MONOTONIC,), you can adjust the sleep times to account for the typical latencies.

Here's a quick test program using clock_gettime(CLOCK_MONOTONIC,) and nanosleep():

#define _POSIX_C_SOURCE 200809L
#include <sys/select.h>
#include <time.h>

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

static const long   tick_latency = 75000L; /* 0.75 ms */
static const long   tick_adjust  = 75000L; /* 0.75 ms */

typedef struct {
    struct timespec  next;
    struct timespec  tick;
} state;

void state_init(state *const s, const double ticks_per_sec)
{
    if (ticks_per_sec > 0.0) {
        const double interval = 1.0 / ticks_per_sec;
        s->tick.tv_sec = (time_t)interval;
        s->tick.tv_nsec = (long)(1000000000.0 * (interval - (double)s->tick.tv_sec));
        if (s->tick.tv_nsec < 0L)
            s->tick.tv_nsec = 0L;
        else
        if (s->tick.tv_nsec > 999999999L)
            s->tick.tv_nsec = 999999999L;
    } else {
        s->tick.tv_sec = 0;
        s->tick.tv_nsec = 0L;
    }
    clock_gettime(CLOCK_MONOTONIC, &s->next);
}

static unsigned long count;

double state_tick(state *const s)
{
    struct timespec now, left;

    /* Next tick. */
    s->next.tv_sec += s->tick.tv_sec;
    s->next.tv_nsec += s->tick.tv_nsec;
    if (s->next.tv_nsec >= 1000000000L) {
        s->next.tv_nsec -= 1000000000L;
        s->next.tv_sec++;
    }

    count = 0UL;

    while (1) {

        /* Get current time. */
        clock_gettime(CLOCK_MONOTONIC, &now);

        /* Past tick time? */
        if (now.tv_sec > s->next.tv_sec ||
            (now.tv_sec == s->next.tv_sec &&
             now.tv_nsec >= s->next.tv_nsec - tick_latency))
            return (double)(now.tv_sec - s->next.tv_sec)
                 + (double)(now.tv_nsec - s->next.tv_nsec) / 1000000000.0;

        /* Calculate duration to wait */
        left.tv_sec = s->next.tv_sec - now.tv_sec;
        left.tv_nsec = s->next.tv_nsec - now.tv_nsec - tick_adjust;
        if (left.tv_nsec >= 1000000000L) {
            left.tv_nsec -= 1000000000L;
            left.tv_sec++;
        } else
        if (left.tv_nsec < -1000000000L) {
            left.tv_nsec += 2000000000L;
            left.tv_sec += 2;
        } else
        if (left.tv_nsec < 0L) {
            left.tv_nsec += 1000000000L;
            left.tv_sec--;
        }

        count++;

        nanosleep(&left, NULL);
    }
}


int main(int argc, char *argv[])
{
    double  rate, jitter;
    long    ticks, i;
    state   s;
    char    dummy;

    if (argc != 3 || !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
        fprintf(stderr, "\n");
        fprintf(stderr, "Usage: %s [ -h | --help ]\n", argv[0]);
        fprintf(stderr, "       %s TICKS_PER_SEC TICKS\n", argv[0]);
        fprintf(stderr, "\n");
        return 1;
    }

    if (sscanf(argv[1], " %lf %c", &rate, &dummy) != 1 || rate <= 0.0) {
        fprintf(stderr, "%s: Invalid tick rate.\n", argv[1]);
        return 1;
    }
    if (sscanf(argv[2], " %ld %c", &ticks, &dummy) != 1 || ticks < 1L) {
        fprintf(stderr, "%s: Invalid tick count.\n", argv[2]);
        return 1;
    }

    state_init(&s, rate);
    for (i = 0L; i < ticks; i++) {
        jitter = state_tick(&s);
        if (jitter > 0.0)
            printf("Tick %9ld: Delayed   %9.6f ms, %lu sleeps\n", i+1L, +1000.0 * jitter, count);
        else
        if (jitter < 0.0)
            printf("Tick %9ld: Premature %9.6f ms, %lu sleeps\n", i+1L, -1000.0 * jitter, count);
        else
            printf("Tick %9ld: Exactly on time, %lu sleeps\n", i+1L, count);
        fflush(stdout);
    }

    return 0;
}

Above, tick_latency is the number of nanoseconds you're willing to accept a "tick" in advance, and tick_adjust is the number of nanoseconds you subtract from each sleep duration.

The best values for those are highly configuration-specific, and I haven't got a robust method for estimating them. Hardcoding them (to 0.75ms as above) does not sound too good to me either; perhaps using command-line options or environment values to let users control it, and default to zero would be better.

Anyway, compiling the above as

gcc -O2 test.c -lrt -o test

and running a 500-tick test at 50Hz tick rate,

./test 50 500 | sort -k 4

shows that on my machine, the ticks are accepted within 0.051 ms (51 µs) of the desired moment. Even reducing the priority does not seem to affect it much. A test using 5000 ticks at 5kHz rate (0.2ms per tick),

nice -n 19 ./test 5000 5000 | sort -k 4

yields similar results -- although I did not bother to check what happens if the machine load changes during a run.

In other words, preliminary tests on a single machine indicates it might be a viable option, so you might wish to test the scheme on different machines and under different loads. It is much more precise than I expected on my own machine (Ubuntu 3.11.0-24-generic on x86_64, running on an AMD Athlon II X4 640 CPU).

This approach has the interesting property that you can easily use a single thread to maintain multiple states, even if they use different tick rates. You only need to check which state has the next tick (earliest ->next time), nanosleep() if that occurs in the future, and process the tick, advancing that state to the next tick.

Questions?

Community
  • 1
  • 1
Nominal Animal
  • 38,216
  • 5
  • 59
  • 86
  • 1
    Excellent answer, thanks very much. That gives me some great food for thought. I use multiple state machines per process, but this should be adaptable to get rid of the Pause() statements. Thank-you. – Brian Sidebotham Jul 08 '14 at 12:41
0

In addition of Nominal Animal's answer :

If the Pause time is several milliseconds, you might use poll(2) or perhaps nanosleep(2) (you might compute the remaining time to sleep, e.g. using clock_gettime(2) with CLOCK_REALTIME ...)

If you care about the fact that StateMachine may take several milliseconds (or a large fraction of a millisecond) and you want exactly a 10 millisecond period, consider perhaps using a poll based event loop which uses the Linux specific timerfd_create(2)

See also time(7), and this, that answers (to question about poll etc...)

Community
  • 1
  • 1
Basile Starynkevitch
  • 223,805
  • 18
  • 296
  • 547