2

I have a publisher process which writes data to shared memory. Several subscribers read the data from the shared memory. I am using the posix semaphores as shown below (simplified to keep it minimal). Publisher code:

#include <fcntl.h>     //for flag O_CREAT, O_EXCL..
#include <sys/stat.h>  //for mode 0666
#include <semaphore.h> //for sem_open, sem_close, sem..
#include <stdio.h>     //for printf
#define MAX_READERS 100
int main()
{
    int i;
    //create the semaphore
    sem_t *sem = sem_open("AllHailMySemaphore", O_CREAT, 0666, MAX_READERS);
    //lock all the semaphores
    for(i=0; i<MAX_READERS; i++)
        sem_wait(sem);   //will this wait forever?
    //write to shared memory (skipped)
    //unlock all the semaphores
    for(i=0; i<MAX_READERS; i++)
        sem_post(sem);
    return 0;
}

Subscriber Code:

#include <semaphore.h>
#include <stdio.h>
int main()
{
    sem_t *sem = sem_open("AllHailMySemaphore", 0);    //get the sem
    sem_wait(sem);   //consume 1 semaphore
    //read shared memory
    sem_post(sem);   //release 1 semaphore
    return 0;
}

There is only 1 publisher and 100 subscribers; I suspect that the publisher will starve (has to wait forever to lock all the semaphores) in the production environment. Is there a way give preference to the publisher?

Foo Bar
  • 56
  • 5
  • 1
    Use reader-writer-locks rather than semaphores? – EOF Aug 16 '16 at 15:41
  • @EOF: Thanks for the suggestion. I found [this link](http://stackoverflow.com/questions/2190090/how-to-prevent-writer-starvation-in-a-read-write-lock-in-pthreads) describing it. But it is in the multi-threaded context. I am dealing with multiple processes. I could possibly use it by creating the lock inside shared-memory. – Foo Bar Aug 20 '16 at 09:33

1 Answers1

0

I am answering my own question. After reading the man pages over and over again, this is what I have come up with:

To prevent starvation of the publisher process, I can use another semaphore which is locked only by the publisher. The subscribers will wait for locking only if the publisher semaphore is unlocked. See code below: Publisher:

#include <fcntl.h>     //for flag O_CREAT, O_EXCL..
#include <sys/stat.h>  //for mode 0666
#include <semaphore.h> //for sem_open, sem_close, sem..
#include <stdio.h>     //for printf
#define MAX_READERS 100
int main()
{
    int i;
    //create the semaphore
    sem_t *rsem = sem_open("ReaderSem", O_CREAT, 0666,
            MAX_READERS);
    sem_t *wsem = sem_open("WriterSem", O_CREAT, 0644,
            1);
    //lock the writer semaphore
    sem_wait(wsem);
    printf("writer semaphore locked. press enter to continue:");
    getchar();
    //lock all the reader semaphores
    for(i=0; i<MAX_READERS; i++)
        sem_wait(rsem);
    //unlock the writer semaphore
    sem_post(wsem);
    //write to shared memory (skipped)
    //unlock all the reader semaphores
    for(i=0; i<MAX_READERS; i++)
        sem_post(rsem);
    return 0;
}

Subscriber Code:

#include <semaphore.h>
#include <stdio.h>
int main()
{
    sem_t *rsem = sem_open("ReaderSem", 0);    //get the reader sem
    sem_t *wsem = sem_open("WriterSem", 0);    //get the writer sem
    int wsem_val = 0;
    while(wsem_val<1){ //writer sem is locked
        sem_getvalue(wsem, &wsem_val);
        printf("writer sem val = %d\n", wsem_val);
        sleep(1);
    }
    sem_wait(rsem);   //consume 1 semaphore
    //read shared memory
    sem_post(rsem);   //release 1 semaphore
    return 0;
}

A waiting while loop is probably not recommended. There is no 'wait-for-zero' operation in posix semaphore, but it is there is SystemV semaphores. Also, SystemV semaphores are widely available. So, I am choosing SystemV over Posix semaphores. Find the systemV implementation below: Publisher:

#include <sys/sem.h>  //for systemV semaphores
#include <stdio.h>    //for printf and getchar
#include <stdlib.h>   //for malloc
#define MAX_READERS 100
int main()
{
    key_t key = (key_t)0xfeededdf; //any unique val. you could use ftok
    int semid = semget(key, 2,     //creates a semaphore set with 2 sems
            IPC_CREAT | IPC_EXCL | 0666);
    //initialize the sem.. note: creation and init is not atomic
    union semun {
        int val;
        struct semid_ds *buf;
        ushort *array;
    } arg;
    arg.array = (ushort*)malloc(sizeof(ushort)*2);
    arg.array[0] = 0;
    arg.array[1] = MAX_READERS;
    semctl(semid, 0, SETALL, arg);
    //end init
    printf("press enter to lock:");
    getchar();
    //lock the writer semaphore
    struct sembuf sb;
    sb.sem_num = 0;           //0th semaphore is writer
    sb.sem_op = 1;            //set it to 1
    sb.sem_flg = IPC_NOWAIT;  //should not have to wait
    semop(semid, &sb, 1);
    //end lock writer semaphore
    //lock all the reader semaphores
    sb.sem_num = 1;            //1th semaphore is reader
    sb.sem_op = -MAX_READERS;  //lock all together.. no loops
    sb.sem_flg = SEM_UNDO;     //undo this change when process terminates
    semop(semid, &sb, 1);
    //unlock the writer semaphore
    sb.sem_num = 0;           //0th semaphore is writer
    sb.sem_op = -1;           //make it 0
    sb.sem_flg = IPC_NOWAIT;  //should not have to wait
    semop(semid, &sb, 1);
    //end unlock writer
    //Write to shared memory (skipped)
    printf("press enter to unlock:");
    getchar();
    //unlock all the reader semaphores
    sb.sem_num = 1;           //1th semaphore is reader
    sb.sem_op = MAX_READERS;  //unlock all together
    sb.sem_flg = SEM_UNDO;    //undo this change when process terminates
    semop(semid, &sb, 1);
    printf("press enter to delete semaphore and exit:");
    getchar();
    semctl(semid, 0, IPC_RMID);
    return 0;
}

Subscribers:

#include <stdio.h>      //for printf and getchar
#include <sys/sem.h>    //for systemV semaphores
int main()
{
    key_t key = (key_t)0xfeededdf;     //should use ftok instead
    int semid = semget(key, 0, 0);     //get the existing semaphore
    printf("press enter to lock reader semaphore:");
    getchar();
    struct sembuf sb[2];      //2 operations: 1.wait-for-writer 2.lock-reader
    sb[0].sem_num = 0;        //0th : writer semaphore
    sb[0].sem_op =  0;        //wait for zero
    sb[0].sem_flg = 0;        //don't need to undo
    sb[1].sem_num = 1;        //1th : reader semaphore
    sb[1].sem_op = -1;        //lock - decrement by 1
    sb[1].sem_flg = SEM_UNDO; //undo when process terminates
    semop(semid, sb, 2);      //2 denotes 2 operations (nsops=2)
    //read shared memory (skipped)
    printf("press enter to unlock reader semaphore and exit:");
    getchar();
    sb[0].sem_num = 1;        //1: reader sem. no need to wait for writer
    sb[0].sem_op  = 1;        //unlock: increment by 1
    sb[0].sem_flg = SEM_UNDO; //it might feel weird to undo the unlocking. but
                              //it is necessary. read about semadj structure.
    semop(semid, sb, 1);      //sb[1] will be ignored as nsops=1
    return 0;
}

Source: Chapter 15 (interprocess communication) in Stevens book

P.S: I have omitted return value checking (for errors) to keep the code small.

Foo Bar
  • 56
  • 5