2

I was trying to make a code with multi producers and consumers. I have created multi-threads for producer and consumer and used semaphores for synchronization. The code was working fine with single producer and consumer.

The problem which I am facing is that after some time of program execution, only the consumer1 and producer1 are participating in the process. I am not able to understand what happened to the other producers and consumers.

I would also like to know as how to make multi producer-consumer problem efficient? Efficient in the sense that all producer and consumer gets equal opportunity to produce and consume respectively? C++ code(it includes a lot of C):

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <queue>
using namespace std;
sem_t empty;
sem_t full;
int cnt = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
queue<int> q;
void *producer(void *a)
{   
    int *num = (int *)a;
    while(1) {
        sem_wait(&empty);
        pthread_mutex_lock(&mutex);
        cnt = cnt+1;
        q.push(cnt);
        cout<<cnt<<" item produced by producer "<<(*num+1)<<endl;
        pthread_mutex_unlock(&mutex);
        sem_post(&full);
        sleep(1);
    }
}
void *consumer(void *a)
{   
    int *num = (int *)a;
    while(1) {
        sem_wait(&full);
        pthread_mutex_lock(&mutex);
        cout<<q.front()<<" item consumed by consumer "<<(*num+1)<<endl;
        q.pop();
        pthread_mutex_unlock(&mutex);
        sem_post(&empty);
        sleep(1);
    }
}
int main()
{   
    pthread_t p[5];
    pthread_t c[5];
    sem_init(&empty,0,5);
    sem_init(&full,0,0);
    int i;
    for(i = 0; i < 5; i++) {
        pthread_create(&p[i],NULL,producer,(void *)(&i));
    }
    for(i = 0; i < 5; i++) {
        pthread_create(&c[i],NULL,consumer,(void *)(&i));
    }
    for(i = 0; i < 5; i++) {
        pthread_join(p[i],NULL);
        pthread_join(c[i],NULL);
    }
}

Updated code:

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <queue>
#include <map>
using namespace std;
sem_t empty;
sem_t full;
int cnt = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
map<pthread_t,int> mc,mp;
queue<int> q;
void *producer(void *a)
{   
    while(1) {
        sem_wait(&empty);
        pthread_mutex_lock(&mutex);
        cnt = cnt+1;
        q.push(cnt);
        cout<<cnt<<" item produced by producer "<<mp[pthread_self()]<<endl;
        pthread_mutex_unlock(&mutex);
        sem_post(&full);
        sleep(1);
    }
}
void *consumer(void *a)
{   
    while(1) {
        sem_wait(&full);
        pthread_mutex_lock(&mutex);
        cout<<q.front()<<" item consumed by consumer "<<mc[pthread_self()]<<endl;
        q.pop();
        pthread_mutex_unlock(&mutex);
        sem_post(&empty);
        sleep(1);
    }
}
int main()
{   
    pthread_t p[5];
    pthread_t c[5];
    sem_init(&empty,0,5);
    sem_init(&full,0,0);
    int i;
    pthread_mutex_lock(&mutex);
    for(i = 0; i < 5; i++) {
        pthread_create(&p[i],NULL,producer,NULL);
        pthread_create(&c[i],NULL,consumer,NULL);
        mc[c[i]] = i+1;
        mp[p[i]] = i+1; 
    }
    pthread_mutex_unlock(&mutex);
    for(i = 0; i < 5; i++) {
        pthread_join(p[i],NULL);
        pthread_join(c[i],NULL);
    }
}
Shivam Mitra
  • 1,040
  • 3
  • 17
  • 33
  • Take look at the argument you are passing to pthread_create and how you are using that information in the thread functions. Consider if there is a race condition. Consider the impact of passing `i` versus passing `&i`. – David Thomas Jul 03 '16 at 15:03
  • Any reason that justifies the use of OS specific `pthread` instead of the standard `std::thread` ? – Christophe Jul 03 '16 at 15:04
  • @Christophe : Since I have not learnt threads in c++, I used the c version. – Shivam Mitra Jul 03 '16 at 15:08
  • @DavidThomas : How can race condition be created when I am using mutex to allow only thread to access critical reason at a time? – Shivam Mitra Jul 03 '16 at 15:10
  • @shivammitra. All 10 of your threads receive a pointer to the same memory location `i`. This memory location is being changed while the threads are being created. There is a race condition between the threads reading the memory location and main() modifying the memory location. If you explicitly pass the value of i to each thread (and not a pointer to I) you will not be sharing a memory location and eliminate the race. – David Thomas Jul 03 '16 at 17:56
  • @DavidThomas: Thanks, I got it. – Shivam Mitra Jul 03 '16 at 17:56

1 Answers1

2

Short answer

The threads do in fact execute with equal opportunity, but they just printout an identifier which is not theirs.

Detailed explanation

You keep in each thread a pointer num to the thread number. It's the pointer to that value which is saved and not the value itself. So all the threads point to the same counter, thinking to find there their own identifier.

Everytime you access *num, you get access not to the value that i had when you launched the thread, but its current value.

Unfortunately, in every loop of main(), you reuse the variable i. So the last loop, you'll set i back to 0, and wait for the first threads to join. But all these threads loop forever, so the loop will hardly get a chance to go beyond this initial 0 value. So that every thread thinks it's the number *num+1 that is 1 at this moment.

Note by the way that you create a race condition as someone pointed out in the comments: all the consumer and producer threads dereference the pointer, accessing to the same variable in a mutex-protected region. This is ok. But while they are reading the variable, the main thread still happily can change the shared variable outside of any lock. This is definitively a risk of race.

Workaround

std::thread would allow you to pass i by walue, so that each thread has its own unaltered copy of is id.

With pthreads you have to pass a pointer to a value. Unfortunately, even if you'd do a local copy of the value pointed at, right at the start of the thread, you'd still be in a race condition.

A quick workaround to observe which thread is really doing the work would be to printout as well the result of pthread_self() (see here how to do it). Or to store the ids in an array of int, and pass to each thread the address to a unique element in that array.

Community
  • 1
  • 1
Christophe
  • 68,716
  • 7
  • 72
  • 138
  • How thread_join will change the initial value? I don't get it? – Shivam Mitra Jul 03 '16 at 15:19
  • @ShivamMitra the thread_join doesn't change anything, but you do it in a loop that starts with `for (i=0; ` and unfortunatly, it is the same i that all the num pointers ar pointing to. – Christophe Jul 03 '16 at 15:24
  • Any suggestion on how to print which thread have produced or consumed? – Shivam Mitra Jul 03 '16 at 15:36
  • @ShivamMitra it's in my edit (including hyperlink on how to print a `pthread_t`) – Christophe Jul 03 '16 at 15:38
  • :I have updated the code. Do check if race condition still exists? I think it does. But the output doesn't show any race condition. – Shivam Mitra Jul 04 '16 at 09:58
  • @ShivamMitra the race condition still exist on the maps you are using: threads are reading the maps while main thread update them outside a lock. In worst case a thread could search a value which is not yet in the map while it's being inserted and the map in an unstable state. This could even lead to a segfault. Solution: add a mutex lock in each main loop: this will prevent consumer/producer to enter critical region before map is completely updated – Christophe Jul 04 '16 at 12:05
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/116372/discussion-between-shivam-mitra-and-christophe). – Shivam Mitra Jul 04 '16 at 12:25