Create a periodic timer using timer_create()
, and have it call sem_post()
on a "timer tick semaphore".
To avoid losing ticks, I recommend using a real-time signal, perhaps SIGRTMIN+0
or SIGRTMAX-0
. sem_post()
is async-signal-safe, so you can safely use it in a signal handler.
Your state machine simply waits on the semaphore; no other timekeeping needed. If you take too long to process a tick, the following sem_wait()
will not block, but return immediately. Essentially, the semaphore counts "lost" ticks.
Example code (untested!):
#define _POSIX_C_SOURCE 200809L
#include <semaphore.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#define TICK_SIGNAL (SIGRTMIN+0)
static timer_t tick_timer;
static sem_t tick_semaphore;
static void tick_handler(int signum, siginfo_t *info, void *context)
{
if (info && info->si_code == SI_TIMER) {
const int saved_errno = errno;
sem_post((sem_t *)info->si_value.sival_ptr);
errno = saved_errno;
}
}
static int tick_setup(const struct timespec interval)
{
struct sigaction act;
struct sigevent evt;
struct itimerspec spec;
if (sem_init(&tick_semaphore, 0, 0))
return errno;
sigemptyset(&act.sa_mask);
act.sa_handler = tick_handler;
act.sa_flags = 0;
if (sigaction(TICK_SIGNAL, &act, NULL))
return errno;
evt.sigev_notify = SIGEV_SIGNAL;
evt.sigev_signo = TICK_SIGNAL;
evt.sigev_value.sival_ptr = &tick_semaphore;
if (timer_create(CLOCK_MONOTONIC, &evt, &tick_timer))
return errno;
spec.it_interval = interval;
spec.it_value = interval;
if (timer_settime(tick_timer, 0, &spec, NULL))
return errno;
return 0;
}
with the tick loop being simply
if (tick_setup(some_interval))
/* failed, see errno; abort */
while (!sem_wait(&tick_semaphore)) {
/* process tick */
}
If you support more than one concurrent state, the one signal handler suffices. Your state typically would include
timer_t timer;
sem_t semaphore;
struct timespec interval;
and the only tricky thing is to make sure there is no pending timer signal when destroying the state that signal would access.
Because signal delivery will interrupt any blocking I/O in the thread used for the signal delivery, you might wish to set up a special thread in your library to handle the timer tick realtime signals, with the realtime signal blocked in all other threads. You can mark your library initialization function __attribute__((constructor))
, so that it is automatically executed prior to main()
.
Optimally, you should use the same thread that does the tick processing for the signal delivery. Otherwise there will be some small jitter or latency in the tick processing, if the signal was delivered using a different CPU core than the one that is running the tick processing.
Basile Starynkevitch's answer jogged my memory about the latencies involved in waiting and signal delivery: If you use nanosleep()
and clock_gettime(CLOCK_MONOTONIC,)
, you can adjust the sleep times to account for the typical latencies.
Here's a quick test program using clock_gettime(CLOCK_MONOTONIC,)
and nanosleep()
:
#define _POSIX_C_SOURCE 200809L
#include <sys/select.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
static const long tick_latency = 75000L; /* 0.75 ms */
static const long tick_adjust = 75000L; /* 0.75 ms */
typedef struct {
struct timespec next;
struct timespec tick;
} state;
void state_init(state *const s, const double ticks_per_sec)
{
if (ticks_per_sec > 0.0) {
const double interval = 1.0 / ticks_per_sec;
s->tick.tv_sec = (time_t)interval;
s->tick.tv_nsec = (long)(1000000000.0 * (interval - (double)s->tick.tv_sec));
if (s->tick.tv_nsec < 0L)
s->tick.tv_nsec = 0L;
else
if (s->tick.tv_nsec > 999999999L)
s->tick.tv_nsec = 999999999L;
} else {
s->tick.tv_sec = 0;
s->tick.tv_nsec = 0L;
}
clock_gettime(CLOCK_MONOTONIC, &s->next);
}
static unsigned long count;
double state_tick(state *const s)
{
struct timespec now, left;
/* Next tick. */
s->next.tv_sec += s->tick.tv_sec;
s->next.tv_nsec += s->tick.tv_nsec;
if (s->next.tv_nsec >= 1000000000L) {
s->next.tv_nsec -= 1000000000L;
s->next.tv_sec++;
}
count = 0UL;
while (1) {
/* Get current time. */
clock_gettime(CLOCK_MONOTONIC, &now);
/* Past tick time? */
if (now.tv_sec > s->next.tv_sec ||
(now.tv_sec == s->next.tv_sec &&
now.tv_nsec >= s->next.tv_nsec - tick_latency))
return (double)(now.tv_sec - s->next.tv_sec)
+ (double)(now.tv_nsec - s->next.tv_nsec) / 1000000000.0;
/* Calculate duration to wait */
left.tv_sec = s->next.tv_sec - now.tv_sec;
left.tv_nsec = s->next.tv_nsec - now.tv_nsec - tick_adjust;
if (left.tv_nsec >= 1000000000L) {
left.tv_nsec -= 1000000000L;
left.tv_sec++;
} else
if (left.tv_nsec < -1000000000L) {
left.tv_nsec += 2000000000L;
left.tv_sec += 2;
} else
if (left.tv_nsec < 0L) {
left.tv_nsec += 1000000000L;
left.tv_sec--;
}
count++;
nanosleep(&left, NULL);
}
}
int main(int argc, char *argv[])
{
double rate, jitter;
long ticks, i;
state s;
char dummy;
if (argc != 3 || !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
fprintf(stderr, "\n");
fprintf(stderr, "Usage: %s [ -h | --help ]\n", argv[0]);
fprintf(stderr, " %s TICKS_PER_SEC TICKS\n", argv[0]);
fprintf(stderr, "\n");
return 1;
}
if (sscanf(argv[1], " %lf %c", &rate, &dummy) != 1 || rate <= 0.0) {
fprintf(stderr, "%s: Invalid tick rate.\n", argv[1]);
return 1;
}
if (sscanf(argv[2], " %ld %c", &ticks, &dummy) != 1 || ticks < 1L) {
fprintf(stderr, "%s: Invalid tick count.\n", argv[2]);
return 1;
}
state_init(&s, rate);
for (i = 0L; i < ticks; i++) {
jitter = state_tick(&s);
if (jitter > 0.0)
printf("Tick %9ld: Delayed %9.6f ms, %lu sleeps\n", i+1L, +1000.0 * jitter, count);
else
if (jitter < 0.0)
printf("Tick %9ld: Premature %9.6f ms, %lu sleeps\n", i+1L, -1000.0 * jitter, count);
else
printf("Tick %9ld: Exactly on time, %lu sleeps\n", i+1L, count);
fflush(stdout);
}
return 0;
}
Above, tick_latency
is the number of nanoseconds you're willing to accept a "tick" in advance, and tick_adjust
is the number of nanoseconds you subtract from each sleep duration.
The best values for those are highly configuration-specific, and I haven't got a robust method for estimating them. Hardcoding them (to 0.75ms as above) does not sound too good to me either; perhaps using command-line options or environment values to let users control it, and default to zero would be better.
Anyway, compiling the above as
gcc -O2 test.c -lrt -o test
and running a 500-tick test at 50Hz tick rate,
./test 50 500 | sort -k 4
shows that on my machine, the ticks are accepted within 0.051 ms (51 µs) of the desired moment. Even reducing the priority does not seem to affect it much. A test using 5000 ticks at 5kHz rate (0.2ms per tick),
nice -n 19 ./test 5000 5000 | sort -k 4
yields similar results -- although I did not bother to check what happens if the machine load changes during a run.
In other words, preliminary tests on a single machine indicates it might be a viable option, so you might wish to test the scheme on different machines and under different loads. It is much more precise than I expected on my own machine (Ubuntu 3.11.0-24-generic on x86_64, running on an AMD Athlon II X4 640 CPU).
This approach has the interesting property that you can easily use a single thread to maintain multiple states, even if they use different tick rates. You only need to check which state has the next tick (earliest ->next
time), nanosleep()
if that occurs in the future, and process the tick, advancing that state to the next tick.
Questions?