I am trying to implement producer-consumer problem using a two-process scenario.
Process1 — Producer, and Process2 — Consumer. The consumer process is waiting on a condition variable (pthread_cond_wait(cond)
) and producer will send a signal to consumer via pthread_cond_signal(cond)
.
I have gone through these links shared mutex and condition variable across process pthread_mutexattr_setpshared
, everywhere it was said it is possible to use mutex and condition variable across multiple processes,
pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);
In this link shared mutexes, as void's recommendation, it was said to check whether my system supports or not? I have checked and I got 200809 as return value of sysconf(_SC_THREAD_PROCESS_SHARED)
which denotes my system supports PTHREAD_PROCESS_SHARED
.
I am trying to send pthread_cond_signal
from producer (process-1) to consumer (Process-2). Both producer and consumer are using the same mutex/condition variable initialized.
However, consumer is not receiving the signal. It seems either signal is not sent or it is lost.
Where I am making mistakes? I am using Ubuntu, gcc-4.6.3.
Here is my code.
Producer.c:
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <pthread.h>
#include <sched.h>
#include <syscall.h>
#include <sys/stat.h>
#include <errno.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <stdbool.h>
pthread_cond_t* condition;
pthread_mutex_t* mutex;
#define OKTOWRITE "/oktowrite"
#define MESSAGE "/message"
#define MUTEX "/lock"
struct shared_use_st
{
bool conditionSatisfied;
};
struct shared_use_st *shared_stuff;
void create_shared_memory()
{
int shmid;
void *shared_memory=(void *)0;
shmid =shmget( (key_t)1234, 4096, 0666 | IPC_CREAT );
if (shmid == -1)
{
fprintf(stderr,"shmget failed\n");
exit(EXIT_FAILURE);
}
shared_memory =shmat(shmid, (void *)0,0);
if(shared_memory == (void *)-1)
{
fprintf(stderr,"shmat failed\n");
exit(EXIT_FAILURE);
}
shared_stuff = (struct shared_use_st *)shared_memory;
}
int main()
{
int des_cond, des_msg, des_mutex;
int mode = S_IRWXU | S_IRWXG;
des_mutex = shm_open(MUTEX, O_CREAT | O_RDWR | O_TRUNC, mode);
if (des_mutex < 0) {
perror("failure on shm_open on des_mutex");
exit(1);
}
if (ftruncate(des_mutex, sizeof(pthread_mutex_t)) == -1) {
perror("Error on ftruncate to sizeof pthread_cond_t\n");
exit(-1);
}
mutex = (pthread_mutex_t*) mmap(NULL, sizeof(pthread_mutex_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_mutex, 0);
if (mutex == MAP_FAILED ) {
perror("Error on mmap on mutex\n");
exit(1);
}
des_cond = shm_open(OKTOWRITE, O_CREAT | O_RDWR | O_TRUNC, mode);
if (des_cond < 0) {
perror("failure on shm_open on des_cond");
exit(1);
}
if (ftruncate(des_cond, sizeof(pthread_cond_t)) == -1) {
perror("Error on ftruncate to sizeof pthread_cond_t\n");
exit(-1);
}
condition = (pthread_cond_t*) mmap(NULL, sizeof(pthread_cond_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_cond, 0);
if (condition == MAP_FAILED ) {
perror("Error on mmap on condition\n");
exit(1);
}
/* set mutex shared between processes */
pthread_mutexattr_t mutexAttr;
pthread_mutexattr_init(&mutexAttr);
pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(mutex, &mutexAttr);
/* set condition shared between processes */
pthread_condattr_t condAttr;
pthread_condattr_init(&condAttr);
pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(condition, &condAttr);
create_shared_memory();
shared_stuff->conditionSatisfied=0;
int count=0;
while(count++<10)
{
pthread_mutex_lock(mutex);
shared_stuff->conditionSatisfied=1;
pthread_mutex_unlock(mutex);
pthread_cond_signal(condition);
printf("signal sent to consumer, %d\n",count);
sleep(3);
}
pthread_condattr_destroy(&condAttr);
pthread_mutexattr_destroy(&mutexAttr);
pthread_mutex_destroy(mutex);
pthread_cond_destroy(condition);
shm_unlink(OKTOWRITE);
shm_unlink(MESSAGE);
shm_unlink(MUTEX);
return 0;
}
Consumer.c:
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <pthread.h>
#include <sched.h>
#include <syscall.h>
#include <sys/stat.h>
#include <errno.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <stdbool.h>
pthread_cond_t* condition;
pthread_mutex_t* mutex;
#define OKTOWRITE "/oktowrite"
#define MESSAGE "/message"
#define MUTEX "/lock"
struct shared_use_st
{
bool conditionSatisfied;
};
struct shared_use_st *shared_stuff;
void create_shared_memory()
{
int shmid;
void *shared_memory=(void *)0;
shmid =shmget( (key_t)1234, 4096, 0666 | IPC_CREAT );
if (shmid == -1)
{
fprintf(stderr,"shmget failed\n");
exit(EXIT_FAILURE);
}
shared_memory =shmat(shmid, (void *)0,0);
if(shared_memory == (void *)-1)
{
fprintf(stderr,"shmat failed\n");
exit(EXIT_FAILURE);
}
shared_stuff = (struct shared_use_st *)shared_memory;
}
int main()
{
int des_cond, des_msg, des_mutex;
int mode = S_IRWXU | S_IRWXG;
des_mutex = shm_open(MUTEX, O_CREAT | O_RDWR | O_TRUNC, mode);
if (des_mutex < 0) {
perror("failure on shm_open on des_mutex");
exit(1);
}
if (ftruncate(des_mutex, sizeof(pthread_mutex_t)) == -1) {
perror("Error on ftruncate to sizeof pthread_cond_t\n");
exit(-1);
}
mutex = (pthread_mutex_t*) mmap(NULL, sizeof(pthread_mutex_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_mutex, 0);
if (mutex == MAP_FAILED ) {
perror("Error on mmap on mutex\n");
exit(1);
}
des_cond = shm_open(OKTOWRITE, O_CREAT | O_RDWR | O_TRUNC, mode);
if (des_cond < 0) {
perror("failure on shm_open on des_cond");
exit(1);
}
if (ftruncate(des_cond, sizeof(pthread_cond_t)) == -1) {
perror("Error on ftruncate to sizeof pthread_cond_t\n");
exit(-1);
}
condition = (pthread_cond_t*) mmap(NULL, sizeof(pthread_cond_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_cond, 0);
if (condition == MAP_FAILED ) {
perror("Error on mmap on condition\n");
exit(1);
}
/* set mutex shared between processes */
pthread_mutexattr_t mutexAttr;
pthread_mutexattr_init(&mutexAttr);
pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(mutex, &mutexAttr);
/* set condition shared between processes */
pthread_condattr_t condAttr;
pthread_condattr_init(&condAttr);
pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(condition, &condAttr);
create_shared_memory();
shared_stuff->conditionSatisfied=0;
while(1)
{
printf("Receiver waits on for signal from hello1.c \n");
pthread_mutex_lock(mutex);
while(!shared_stuff->conditionSatisfied)
pthread_cond_wait(condition, mutex);
pthread_mutex_unlock(mutex);
printf("Signal received, wake up!!!!!!!!\n");
//reset
pthread_mutex_lock(mutex);
shared_stuff->conditionSatisfied=0;
pthread_mutex_unlock(mutex);
}
}