0
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <wait.h>
#include <pthread.h>

int item_to_produce, curr_buf_size;
int total_items, max_buf_size, num_workers, num_masters;
int consumed_items;
int *buffer;
pthread_mutex_t mutex;
pthread_cond_t  has_data;
pthread_cond_t  has_space;



void print_produced(int num, int master) {

  printf("Produced %d by master %d\n", num, master);

}

void print_consumed(int num, int worker) {

  printf("Consumed %d by worker %d\n", num, worker);

}

//consume items in buffer
void *consume_requests_loop(void *data)
{
  int thread_id = *((int *)data);

  while(1)
  {
    pthread_mutex_lock(&mutex); // mutex lock for consume
    if(consumed_items == total_items) {
       pthread_mutex_unlock(&mutex);
       break;
     }     

    while(curr_buf_size == 0) {
      pthread_cond_wait(&has_data, &mutex);
    }

    print_consumed(buffer[(curr_buf_size--)-1], thread_id);
    consumed_items++;

    pthread_cond_signal(&has_space);
    pthread_mutex_unlock(&mutex);
  }

  return 0;
}


//produce items and place in buffer
//modify code below to synchronize correctly
void *generate_requests_loop(void *data)
{
  int thread_id = *((int *)data);

  while(1) {

      pthread_mutex_lock(&mutex); // mutex lock for consume

      //all of items are produced
      //master threads need to join
      if(item_to_produce == total_items) {
        pthread_mutex_unlock(&mutex);
        break;
      }

      //there is no item to read
      while (curr_buf_size == max_buf_size) {

        pthread_cond_wait(&has_space, &mutex);

      }

      buffer[curr_buf_size++] = item_to_produce;
      print_produced(item_to_produce, thread_id);
      item_to_produce++;
      pthread_cond_signal(&has_data);
      pthread_mutex_unlock(&mutex); // mutex_produce unlock
    }

  return 0;
}



//write function to be run by worker threads
//ensure that the workers call the function print_consumed when they consume an item
int main(int argc, char *argv[])
{
  int *master_thread_id; // array of master_thread_id
  int *worker_thread_id; // array of worker_thread_id  
  pthread_t *master_thread; // array of master_thread
  pthread_t *worker_thread; // array of worker_thread

  item_to_produce = 0; // item will be produced by master_thread at next time
  curr_buf_size = 0; // index of item will be saved in
  consumed_items = 0;
  
  int i;  

   if (argc < 5) {

    printf("./master-worker #total_items #max_buf_size #num_workers #masters e.g. ./exe 10000 1000 4 3\n");
    exit(1);
  }
  else {

    num_masters = atoi(argv[4]);
    num_workers = atoi(argv[3]);
    total_items = atoi(argv[1]);
    max_buf_size = atoi(argv[2]);
  }

   buffer = (int *)malloc (sizeof(int) * max_buf_size);

   pthread_mutex_init(&mutex, NULL);
   pthread_cond_init(&has_space, NULL);
   pthread_cond_init(&has_data, NULL);

   //create master producer threads

   master_thread_id = (int *)malloc(sizeof(int) * num_masters);
   master_thread = (pthread_t *)malloc(sizeof(pthread_t) * num_masters);

  for (i = 0; i < num_masters; i++)
    master_thread_id[i] = i;

  for (i = 0; i < num_masters; i++)
    pthread_create(&master_thread[i], NULL, generate_requests_loop, (void *)&master_thread_id[i]);

  //create worker consumer threads
  worker_thread_id = (int *)malloc(sizeof(int) * num_workers);
  worker_thread = (pthread_t *)malloc(sizeof(pthread_t) * num_workers);

  for (i = 0; i < num_workers; i++)
    worker_thread_id[i] = i;

  for (i = 0 ; i < num_workers; i++)
    pthread_create(&worker_thread[i], NULL, consume_requests_loop, (void *)&worker_thread_id[i]);

  

  //wait for all threads to complete
  for (i = 0; i < num_masters; i++)
    {
      pthread_join(master_thread[i], NULL);
      printf("master %d joined\n", i);
    }

  for (i = 0; i < num_workers; i++)
    {
      pthread_join(worker_thread[i], NULL);
      printf("worker %d joined\n", i);
    }  

  /*----Deallocating Buffers---------------------*/

  free(buffer);
  free(master_thread_id);
  free(master_thread);
  free(worker_thread_id);
  free(worker_thread);
  pthread_mutex_destroy(&mutex);
  pthread_cond_destroy(&has_data);
  pthread_cond_destroy(&has_space);

  return 0;

}

This code produces a number in the range of given numbers through the argument and consumes it.

But producer produces a number outside the range and doesn't join if it matches the condition. The consumer is too.

e.g when I give range of number like 0~39(total_item = 500), buff size 30(max_buf_size), num_workers 5, num_master 3, it doesn't produce and consume number only 0~39.

It produces and consumes numbers over 40.

Jonathan Leffler
  • 730,956
  • 141
  • 904
  • 1,278
Genie Kim
  • 1
  • 1
  • 1
    There is no wait operation of any kind in this, so there *must* be pure spinning. Any particular reason you're averse to a condition variable ? (or a semaphore, but I prefer the former as a matter of taste and portability). The only thing stopping this from hard-spinning is acquiring the mutex. The back-to-the-beginning code path is spin: lock, check, unlock, relock, check, unlock relock, check, etc... – WhozCraig Nov 29 '20 at 00:35
  • @WhozCraig Semaphore is more like a mutex (a binary semaphore), condition variable and a counter, rather than a stateless condition variable. https://stackoverflow.com/questions/4792449/c0x-has-no-semaphores-how-to-synchronize-threads/4793662#4793662 – Maxim Egorushkin Nov 29 '20 at 00:51
  • @Genie It seems you just provided Consumer's code, If you provide Producer's code question is easier to understand. By `curr_buffer_size` you probably mean `curr_buffer_index` because size is usually fixed. I also suggest you to provide little description about your code variables specially not used ones: `num_masters`, `num_workers`, `max_buff_size` – muradin Nov 29 '20 at 01:44
  • @muradin Can you check it once if I upload the whole code? I thought whole code is a little long. So I just uploaded consumer part. – Genie Kim Nov 29 '20 at 01:56
  • You may find [this](https://coliru.stacked-crooked.com/a/bae31ae8f00fd65f) interesting. It's pretty basic stuff, but might give you some ideas. – WhozCraig Nov 29 '20 at 03:28
  • @WhozCraig Thx! I solved it. :) It was a good resource that was easy to understand. – Genie Kim Nov 29 '20 at 18:49

1 Answers1

0

In that way the thread is in a loop. To put the thread in sleep you can use, for example, the condition variables. (You can read this for more info https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_wait.html)