3

I have a programming question regarding C on Linux. The code shown here is a 'snapshot' of a larger server I'm trying to code.

To explain, I want to:

  1. fork() multiple children processes in server to handle requests generated by parent process or a client or whoever
  2. Each child process has several producer (writer) threads and 1 consumer (reader) thread
  3. Ring buffer of pointers hold info written to and read by threads
  4. Producer threads retrieve a string from a linked list (node->string) and 'push' node->string to a ring buffer of pointers read by consumer
  5. Consumer thread 'pops' from ring buffer and writes to some stream (file, stdout, etc)
  6. I use POSIX semaphores to try and synchronize access to the Ring Buffer between threads (and here is probably the problem)

My Ring Buffer does not work. I'm not sure if I coded the buffer poorly or if the semaphores aren't working as I hoped.

I included 'server.c' and 'circbuff.c' there is also a linkedlist.h library but I'm confident that it works

POSSIBLE PROBLEMS?

  1. I declare buffer_t buffer (ring buffer) globally. I do not malloc it.

  2. ????

Anyone who would could help would really be appreciated.

Recommendations on how to get threads to talk with a linked list or ring buffer are most welcome.

Output is always different, but it always pushes more to the ring buffer than it pops.

OUTPUT: ..format = 'PUSH: child line# thread'

PUSH: 2264 line 0      thread: 139746191480576
PUSH: 2264 line 0      thread: 139746183087872
PUSH: 2264 line 1      thread: 139746183087872
Buffer underflow
pop 0  
Buffer underflow
pop 0  
Buffer underflow
pop 0  
PUSH: 2275 line 0      thread: 139746191480576
PUSH: 2275 line 0      thread: 139746191480576

More OUTPUT

PUSH: 4208 line 0      thread: 140707316717312
PUSH: 4208 line 1      thread: 140707316717312
PUSH: 4208 line 2      thread: 140707316717312
PUSH: 4208 line 3      thread: 140707316717312
PUSH: 4208 line 4      thread: 140707316717312
PUSH: 4208 line 5      thread: 140707316717312
pop 4208 line 0    
pop 4208 line 0

SERVER.C

    #include <stdio.h>
    #include <string.h>
    #include <fcntl.h>
    #include <sys/shm.h>
    #include <sys/stat.h>
    #include <sys/mman.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <semaphore.h>
    #include <dirent.h>
    #include <sys/wait.h>
    #include <errno.h>
    
    #include "circbuff.h"
    #include "server.h"
    

    #define SNAME "/OS"
    
    #define MAXDIRPATH 1024
    #define MAXKEYWORD 256
    #define MAXLINESIZE 1024
    #define MAXOUTSIZE 2048


    sem_t * sem_wrt;
    sem_t * sem_empty;
    sem_t * sem_full;
    sem_t * sem_mutex;
    
    buffer_t buffer;             //a circular buffer pointer array that reads info from a linked list
    void * producer(void *arg);  //thread producer
    void * consumer(void *arg);  //thread consumer

    int main( int argc, char *argv[]){

    if (argc != 3){
        printf("Three arguments required .../client <arbitrary> <**buffer_size**>\n");
        exit(0);
        }
    
    int reqsize= atoi(argv[1]);
    int bbuff= atoi(argv[2]);
    //buffer_t *buf; 
    int qbuffsize= reqsize*(MAXDIRPATH+MAXKEYWORD);

    //char tok1[MAXDIRPATH];
    //char tok2[MAXKEYWORD]; 
    int child_num, status; 
    
      /*** Semaphore open */  
    sem_wrt=sem_open("/sem_wrt", O_CREAT, S_IRUSR | S_IWUSR, 1);
        if (sem_wrt == SEM_FAILED) {
        perror("server: sem_open failed for semaphone sem_wrt");
        return EXIT_FAILURE;
    }
    sem_empty=sem_open("/sem_empty", O_CREAT, S_IRUSR | S_IWUSR, bbuff); //semaphore waits for bbuff number of things to stop process
        if (sem_empty == SEM_FAILED) {
        perror("server: sem_open failed for semaphone sem_empty");
        return EXIT_FAILURE;
    }
    sem_full=sem_open("/sem_full", O_CREAT, S_IRUSR | S_IWUSR, 0);
        if (sem_full == SEM_FAILED) {
        perror("server: sem_open failed for semaphone sem_full");
        return EXIT_FAILURE;
    }
    sem_mutex=sem_open("/sem_mutex", O_CREAT, S_IRUSR | S_IWUSR, 1);
        if (sem_mutex == SEM_FAILED) {
        perror("server: sem_poen failed for semaphone sem_mutex");
        return EXIT_FAILURE;
        }


    /* Create 4 child processes (each will have 11 threads per below, 1 producer, 10 consumers) */
    pid_t pid;
    int ct;
    for(ct=0; ct<4; ct++){

        pid=fork();  //make 5 children
        child_num++;
    }
          



    if(pid==0){//IF CHILD PROCESS

        struct ThreadList* t_list=NULL;  //delcare a list of threads 
        t_list= create_list();           //create the list of  thread_info_nodes             
        
        init(&buffer, bbuff);   //initial circular buffer (declared GLOBALLY), which to multiple threads write, 1 thread reads
        
        pthread_t ctid;                                 //consumer thread id
        pthread_create(&ctid, NULL, consumer, t_list);  //create one consumer/writer/pop-list thread 
        
        
        /* make n_cnt producer threads (10 threads) that take node->string from a linked list and push to a ring buffer of pointers */
        int n_cnt; 
        for(n_cnt=0; n_cnt<10; n_cnt++){
            struct thread_info_node *tnode = malloc(sizeof(struct thread_info_node));//make a node on the heap
            init_tnode(tnode, n_cnt, NULL, " line ");                                //initialize node info ..a string
            insert_tail( tnode, t_list);                                             //insert node to tail of list
            pthread_create( &(tnode->tid), NULL, producer, t_list);                  //create a few producer threads that read list
        }
        
    
        
        //When child process is done, close semaphores to release resources used by child
        sem_close(sem_wrt);  
        sem_close(sem_empty);  
        sem_close(sem_full);   
        sem_close(sem_mutex);
    }
    
    else{// if the PARENT
        /*wait for child processes to finish*/
        int cnt;
        for(cnt=0; cnt < child_num ; cnt++){
            waitpid(-1, &status, 0);}
    

        sem_close(sem_wrt);  
        sem_close(sem_empty);  
        sem_close(sem_full);   //printf("close sem_full= %d\n", sem_close(sem_full)   );  
        sem_close(sem_mutex);
        
        
        sem_unlink("/sem_wrt"); 
        sem_unlink("/sem_empty");  
        sem_unlink("/sem_full"); //printf("unlink sem_full= %d\n", sem_unlink("/sem_full")   );  
        sem_unlink("/sem_mutex");
        
        }
    

return 0; 
}


void * producer(void *arg){

    struct ThreadList *list = arg;  
    struct thread_info_node *tmp = list->head;
    pthread_t tid= pthread_self(); 
    char thread_id[MAXLINESIZE]; 
  
  /* push all but last string from linked list*/ 
  // I AM PUSHING ONLY THE 'node->string' value, not the whole node.  IS THIS THE WR0NG WAY TO DO IT??? 
    while(tmp->next != NULL){
        
        sem_wait(sem_empty); //decrement empty. if 0 no more empty, stop process.  
        sem_wait(sem_mutex);
        
        /*** CRITICAL SECTION ***/
        snprintf(thread_id, 10,  "%lu", tid);
        //strcat(tmp->filename, thread_id);
        printf("PUSH: %s    thread: %lu\n", tmp->filename, tid);
        push( &buffer, tmp->filename );
        ///--------------------//
        
        sem_post(sem_mutex);
        sem_post(sem_full); 
        
        tmp= tmp->next;
    }
 
  /* push last string left in linked list, and an exit message for the consumer */  
    sem_wait(sem_empty); //decrement empty. if 0 no more slots empty, stop process.  
    sem_wait(sem_mutex);
    
    /*** CRITICAL SECTION ***/
    snprintf(thread_id, 10,  "%lu", tid);
    strcat(tmp->filename, thread_id);
    printf("PUSH: %s\n", tmp->filename);
    push( &buffer, tmp->filename );
    
    printf("SPECIAL_EXIT_CODE\n");
    push( &buffer, "SPECIAL_EXIT_CODE"); 
    ///--------------------//
    
    sem_post(sem_mutex);
    sem_post(sem_full);
    
    return NULL;    
}


void * consumer(void *arg){

//struct ThreadList *list = arg;    

char outline[MAXOUTSIZE]; 
    
    while(1){ 
            
     sem_wait(sem_full);
     sem_wait(sem_mutex); 
          /******* CRITICAL SECTION ******/ 

          //printf("==>CONSUMER THREAD\n");
          strcpy(outline, popqueue(&buffer)); 
          printf("pop %s  \n",  outline );
          
          if (strcmp(outline,"SPECIAL_EXIT_CODE")==0) break;
         
          
          /********************************/
      sem_post(sem_mutex);
      sem_post(sem_empty); 
        
         
    }//end WHILE*/
    
    return NULL;
}

CIRCBUFF.H

#ifndef __CIRCBUFF_h__
#define __CIRCBUFF_h__


#include <stdio.h>
#include <stdlib.h>

#define MAXDIRPATH 1024
#define MAXKEYWORD 256
#define MAXLINESIZE 1024
#define MAXOUTSIZE 2048
 
 
struct buffer {
    int size;
    int start;
    //int end;  // position of last element
    /* Tracking start and end of buffer would waste
     * one position. A full buffer would always have
     * to leave last position empty or otherwise
     * it would look empty. Instead this buffer uses
     * count to track if buffer is empty or full
     */
    int count; // number of elements in buffer
    /* Two ways to make buffer element type opaque
     * First is by using typedef for the element
     * pointer. Second is by using void pointer.
     */
    /* different types of buffer: 
    int *element;   // array of integers
    char *element;  // array of characters 
    void *element;  // array of void type (could cast to int, char, etc)
    char **element; //array of char pointers (array of strings)
    void **element; // array of void pointers
    Choosing array of void pointers since it's the most flexible */
    void **element;

};
 
typedef struct buffer buffer_t;
 
void init(buffer_t *buffer, int size) {
    buffer->size = size;
    buffer->start = 0;
    buffer->count = 0;
    //buffer->element = malloc(sizeof(buffer->element)*size);
    /* allocated array of void pointers. Same as below */
    //buffer->element = malloc(sizeof(void *) * size);
    
    /* Allocate array of char poitners*/
    buffer->element = malloc(sizeof(char **) * size);
     
}
 
int full(buffer_t *buffer) {
    if (buffer->count == buffer->size) { 
        return 1;
    } else {
        return 0;
    }
}
 
int empty(buffer_t *buffer) {
    if (buffer->count == 0) {
        return 1;
    } else {
        return 0;
    }
}
     
void push(buffer_t *buffer, void *data) {
    int index;
    if (full(buffer)) {
        printf("Buffer overflow\n");
    } else {
        index = buffer->start + buffer->count; 
        if (index >= buffer->size) {
            index = 0;
            //index= index - buffer->size;//6 
        }
        buffer->element[index] = data;
        buffer->count++;
    }
}
 
 
void * popqueue(buffer_t *buffer) {
    void * element;
    if (empty(buffer)) {
        printf("Buffer underflow\n");
        return "0";
    } else {
       /* FIFO implementation */
       element = buffer->element[buffer->start];
       buffer->start++;
       buffer->count--;
       if (buffer->start == buffer->size) {
           buffer->start = 0;
       }
        
       return element;
    }
}
 
void * popstack(buffer_t *buffer) {
    int index;
    if (empty(buffer)) {
        printf("Buffer underflow\n");
        return "0";
    } else {
        /* LIFO implementation */
        index = buffer->start + buffer->count - 1;
        if (index >= buffer->size) {
           index = buffer->count - buffer->size - 1;
           //index= index-buffer->size
           //index = buffer->count – buffer->size – 1;
        }      
        buffer->count--;
        return buffer->element[index];
    }
}
   
#endif
marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
MattBorg
  • 95
  • 1
  • 1
  • 9
  • tldr; post a minimal example. – Fantastic Mr Fox Apr 05 '17 at 04:03
  • i tried to reduce it. the original server is *much larger – MattBorg Apr 05 '17 at 04:06
  • i know its a bit large, but any help would be greatly appreciated. I was specifically wondering if declaring a global struct which holds a buffer for thread communication is bad or not. – MattBorg Apr 05 '17 at 04:07
  • @MattBorg about good-bad global structs, take a look here http://stackoverflow.com/questions/484635/are-global-variables-bad – CIsForCookies Apr 05 '17 at 04:42
  • Step 1: make sure your ring buffer works single-threaded. If that's not working, you're hosed. Write test code specifically for that. I assume you've already done test code for the list which is why you assert that is OK — if not, test that, too. Step 2: Do you need semaphores or mutexes for the threads in the server? Devise a way of testing whether your semaphore code is working without using the ring buffer. Or, if that's not feasible, print lots of diagnostic information — or get good at debugging multithreaded programs (or both). Please read about creating an MCVE ([MCVE]). – Jonathan Leffler Apr 05 '17 at 05:08
  • ty Jonathan. I'll test it with one thread, thats a good idea ..should've done that. I think the problem may be that i declared my 'ring-buffer' struct as a global struct; I wanted all threads of a process to see it, but i already passed a linked list via the arg for pthread(). Methinks the problem is that, yes all threads of a process can see it ...but i'm thinking all processes see it as well. Im trying to implement a buffer for each process/thread combo ..not a buffer across proceses. .........any thoughts from anyone on this would be most appreciated. – MattBorg Apr 05 '17 at 05:26
  • ..bah ..just read that fork() creates copys for children ...i just dunno whats wrong. – MattBorg Apr 05 '17 at 05:41
  • ran it with 1 thread per child as suggested. ..i'm "hosed" as suggested above. – MattBorg Apr 05 '17 at 06:01
  • don't give up, just make it work :-) – Mathieu Borderé Apr 05 '17 at 06:50

2 Answers2

0

The problem is that fork() do not share global variables. Child and parent processes have their own copies of buffer. You should use shared memory for this purpose.

Yuri Uvarov
  • 1
  • 1
  • 2
  • The idea i'm trying for is that each child *DOES have its own buffer. Buffer is not to be shared among children. Each child will have several write and 1 read threads which communicate inside the process with their own buffer. Then each writer thread will eventually write all info to a log using a file lock. – MattBorg Apr 05 '17 at 21:23
  • I read that threads can see info from each other in 1 of two ways.l Either a global variable or memory declared on the heap. From what i understand the stack for each thread can only be accessed by said thread. – MattBorg Apr 05 '17 at 21:24
0

for those unfortunate enough to come here for answers, i've managed to outline a solution. I fixed two things in my main server

  1. i got rid of the global buffer, ..just risky business. The initial problem is that when pthread_create is called, only 1 arg can be passed (see man pages). I needed alot of info to pass, so i made a 'super-control-struct' consisting of the inter-thread-ring-buffer and all info needed for thread operations and passed that

  2. Semaphores and loops can be tricky.

  3. my basic approach *seems to work. many producer threads process info, push to buffer, 1 writer thread per child writes to a log.txt. Use a ring buffer on the heap to reference a linked list on same heap.

..now i tackle the memory leaks and im done.

MattBorg
  • 95
  • 1
  • 1
  • 9