I am answering my own question. After reading the man pages over and over again, this is what I have come up with:
To prevent starvation of the publisher process, I can use another semaphore which is locked only by the publisher. The subscribers will wait for locking only if the publisher semaphore is unlocked. See code below:
Publisher:
#include <fcntl.h> //for flag O_CREAT, O_EXCL..
#include <sys/stat.h> //for mode 0666
#include <semaphore.h> //for sem_open, sem_close, sem..
#include <stdio.h> //for printf
#define MAX_READERS 100
int main()
{
int i;
//create the semaphore
sem_t *rsem = sem_open("ReaderSem", O_CREAT, 0666,
MAX_READERS);
sem_t *wsem = sem_open("WriterSem", O_CREAT, 0644,
1);
//lock the writer semaphore
sem_wait(wsem);
printf("writer semaphore locked. press enter to continue:");
getchar();
//lock all the reader semaphores
for(i=0; i<MAX_READERS; i++)
sem_wait(rsem);
//unlock the writer semaphore
sem_post(wsem);
//write to shared memory (skipped)
//unlock all the reader semaphores
for(i=0; i<MAX_READERS; i++)
sem_post(rsem);
return 0;
}
Subscriber Code:
#include <semaphore.h>
#include <stdio.h>
int main()
{
sem_t *rsem = sem_open("ReaderSem", 0); //get the reader sem
sem_t *wsem = sem_open("WriterSem", 0); //get the writer sem
int wsem_val = 0;
while(wsem_val<1){ //writer sem is locked
sem_getvalue(wsem, &wsem_val);
printf("writer sem val = %d\n", wsem_val);
sleep(1);
}
sem_wait(rsem); //consume 1 semaphore
//read shared memory
sem_post(rsem); //release 1 semaphore
return 0;
}
A waiting while loop is probably not recommended. There is no 'wait-for-zero' operation in posix semaphore, but it is there is SystemV semaphores. Also, SystemV semaphores are widely available. So, I am choosing SystemV over Posix semaphores. Find the systemV implementation below:
Publisher:
#include <sys/sem.h> //for systemV semaphores
#include <stdio.h> //for printf and getchar
#include <stdlib.h> //for malloc
#define MAX_READERS 100
int main()
{
key_t key = (key_t)0xfeededdf; //any unique val. you could use ftok
int semid = semget(key, 2, //creates a semaphore set with 2 sems
IPC_CREAT | IPC_EXCL | 0666);
//initialize the sem.. note: creation and init is not atomic
union semun {
int val;
struct semid_ds *buf;
ushort *array;
} arg;
arg.array = (ushort*)malloc(sizeof(ushort)*2);
arg.array[0] = 0;
arg.array[1] = MAX_READERS;
semctl(semid, 0, SETALL, arg);
//end init
printf("press enter to lock:");
getchar();
//lock the writer semaphore
struct sembuf sb;
sb.sem_num = 0; //0th semaphore is writer
sb.sem_op = 1; //set it to 1
sb.sem_flg = IPC_NOWAIT; //should not have to wait
semop(semid, &sb, 1);
//end lock writer semaphore
//lock all the reader semaphores
sb.sem_num = 1; //1th semaphore is reader
sb.sem_op = -MAX_READERS; //lock all together.. no loops
sb.sem_flg = SEM_UNDO; //undo this change when process terminates
semop(semid, &sb, 1);
//unlock the writer semaphore
sb.sem_num = 0; //0th semaphore is writer
sb.sem_op = -1; //make it 0
sb.sem_flg = IPC_NOWAIT; //should not have to wait
semop(semid, &sb, 1);
//end unlock writer
//Write to shared memory (skipped)
printf("press enter to unlock:");
getchar();
//unlock all the reader semaphores
sb.sem_num = 1; //1th semaphore is reader
sb.sem_op = MAX_READERS; //unlock all together
sb.sem_flg = SEM_UNDO; //undo this change when process terminates
semop(semid, &sb, 1);
printf("press enter to delete semaphore and exit:");
getchar();
semctl(semid, 0, IPC_RMID);
return 0;
}
Subscribers:
#include <stdio.h> //for printf and getchar
#include <sys/sem.h> //for systemV semaphores
int main()
{
key_t key = (key_t)0xfeededdf; //should use ftok instead
int semid = semget(key, 0, 0); //get the existing semaphore
printf("press enter to lock reader semaphore:");
getchar();
struct sembuf sb[2]; //2 operations: 1.wait-for-writer 2.lock-reader
sb[0].sem_num = 0; //0th : writer semaphore
sb[0].sem_op = 0; //wait for zero
sb[0].sem_flg = 0; //don't need to undo
sb[1].sem_num = 1; //1th : reader semaphore
sb[1].sem_op = -1; //lock - decrement by 1
sb[1].sem_flg = SEM_UNDO; //undo when process terminates
semop(semid, sb, 2); //2 denotes 2 operations (nsops=2)
//read shared memory (skipped)
printf("press enter to unlock reader semaphore and exit:");
getchar();
sb[0].sem_num = 1; //1: reader sem. no need to wait for writer
sb[0].sem_op = 1; //unlock: increment by 1
sb[0].sem_flg = SEM_UNDO; //it might feel weird to undo the unlocking. but
//it is necessary. read about semadj structure.
semop(semid, sb, 1); //sb[1] will be ignored as nsops=1
return 0;
}
Source: Chapter 15 (interprocess communication) in Stevens book
P.S: I have omitted return value checking (for errors) to keep the code small.