0

UPDATE: I cant figure out why I am getting caught in an infinite loop. I can resolve the error by putting an exit condition when the queue reaches max size, but I want to do it by keeping a file counter (fileCounter) that exits the while loop when the code has handled all of the input files.

Earlier today I had the code running correctly, but when implementing some changes to try and handle multiple threads (more than one producer and one consumer) I seem to have broken it along the way. I know I should have had a copy somewhere before I messed with it, but I thought it would be easy to retrace my steps. My ultimate goal is to make it safe for several producer and consumer threads. I include the header and .c file below.

My shared buffer is a queue and the first three functions in my .c file are dedicated to initializing, pushing to and popping from the queue. I use a mutex lock 'lock' and a condition variable cond to manage when producer threads push to the queue vs. wait and when consumer threads pop from the queue. In my requesterThread and resolverThread function I use three semaphores (sem_mutex, sem_full, sem_empty)--one binary and two counting--that keep track of how many slots in my queue are empty and full. I have to manage three shared resources eventually, but with one producer thread and one consumer thread, the only shared resource is the queue.

Any help with this problem and perhaps some tips on how to work on making my code safe for several threads would be greatly appreciated.

Thank you

.h file

#ifndef MULTILOOKUP_H_
#define MULTILOOKUP_H_

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <sys/time.h>
#include "/home/user/Desktop/PA3/util.c"

#define MAX_REQUESTER_THREADS 10
#define MAX_RESOLVER_THREADS 5
#define MAX_INPUT_FILES 10
#define MAXSIZE 20

struct arguments {
        struct queue *q;
        char *input;
        char *resLog;
        char *reqLog;
        char line[100];
        char result[100];
        char ipv6[100];
        char *poisonPill;
        int numInputFiles;
};

struct node {
    char name[100];
    struct node *next;
};

struct queue {
    int size;
    struct node *head;
    struct node *tail;
    pthread_mutex_t lock, lock2; 
    pthread_cond_t cond;
};

void init(struct queue *);
void pop(struct queue *);
void push(struct queue *, char *);
void* requesterThread(void *);
void* resolverThread(void *);


sem_t sem_mutex, sem_full, sem_empty;



#endif

.c file

#include "multilookup.h"

/*----------------------Struct Functions------------------------*/
void init(struct queue *q) {
    q->head = NULL;
    q->tail = NULL;
    q->size = 0;
    pthread_mutex_init(&q->lock, NULL);
    pthread_mutex_init(&q->lock2, NULL);
    pthread_cond_init(&q->cond, NULL);
    
}

void pop(struct queue *q)
{
    pthread_mutex_lock(&q->lock);

    if (q->size > 0) {pthread_cond_broadcast(&q->cond);}
  
    q->size--;
    //printf("%s\n", q->head->name); 
    
    struct node *tmp = q->head;
    q->head = q->head->next;
    free(tmp);
  
    pthread_mutex_unlock(&q->lock);
    
}

void push(struct queue *q, char *name)
{
    pthread_mutex_lock(&q->lock);
    while (q->size == MAXSIZE) {
        pthread_cond_wait(&q->cond, &q->lock);
    }
 
    q->size++;
     
    if (q->head == NULL) {
        q->head = (struct node *)malloc(sizeof(struct node));
        //q->head->name = name;
        strcpy(q->head->name, name);
        q->head->next == NULL;
        q->tail = q->head;
    } else {
        q->tail->next = (struct node *)malloc(sizeof(struct node));
        //q->tail->next->name = name;
        strcpy(q->tail->next->name, name);
        q->tail->next->next = NULL;
        q->tail = q->tail->next;
    }   
    
    pthread_mutex_unlock(&q->lock);
}
/*--------------------------End of struct functions------------------*/

void* requesterThread(void *receivedStruct) {
    struct arguments *args_ptr;
        args_ptr = (struct arguments*) receivedStruct;
    
    FILE *fptr;
        FILE *sptr;
    int fileCounter = 0;    

    //Check for proper file paths
        if ((fptr = fopen(args_ptr->input, "r")) == NULL) {
                fprintf(stderr, "Error! Bogus input file path.\n");
                // Thread exits if file pointer returns NULL.
                exit(1);
        }
    
    if ((sptr = fopen(args_ptr->reqLog, "w")) == NULL) {
        fprintf(stderr, "Error! Bogues output path.\n");
        exit(1);
    }
    
        //Read from input file and push to shared queue
        while (1) {
            while (fscanf(fptr,"%[^\n]%*c", args_ptr->line) != EOF) {   
            
            sem_wait(&sem_empty);
            sem_wait(&sem_mutex);

            push(args_ptr->q, args_ptr->line);
            
            sem_post(&sem_mutex);
            sem_post(&sem_full);            

            
            //Update requesterLog and print logged hostnames 
                fprintf(sptr, "%s \n", args_ptr->line);
            printf("Hostname Logged: %s\n", args_ptr->line);

                    /*LINE TO WRITE "Thread <id> serviced ## files" TO serviced.txt
            fprintf(sptr, "Thread %d serviced %d files", pthread_self(), fileCounter);
            */
        }
        
        fileCounter++;
        //Condition to send poisonPill to consumer
        if(fileCounter == args_ptr->numInputFiles) {
            push(args_ptr->q, args_ptr->poisonPill);
            break;
        }
        
        
    }
    fclose(fptr);
    fclose(sptr);   
    return 0;
}


void* resolverThread(void *receivedStruct) {
        struct arguments *args_ptr;
        args_ptr = (struct arguments *) receivedStruct;

    FILE *rptr;
    
        if( (rptr = fopen(args_ptr->resLog, "w")) == NULL) {
                        fprintf(stderr, "Error! Bogus output file path.\n");
                        exit(1);
        }
    
        while(1) {  
        sem_wait(&sem_full);
                sem_wait(&sem_mutex);
                     
                strcpy(args_ptr->result, args_ptr->q->head->name);
                pop(args_ptr->q);
    
        sem_post(&sem_mutex);
        sem_post(&sem_empty);   
                
                
        if (args_ptr->result != args_ptr->poisonPill) {
            if (dnslookup(args_ptr->result, args_ptr->ipv6, INET6_ADDRSTRLEN) == -1){
                            fprintf(stderr, "Bogus hostname.\n");
                            fprintf(rptr, "%s, \n", args_ptr->result); 
                
                    } else {
                            fprintf(rptr, "%s, %s\n", args_ptr->result, args_ptr->ipv6);
                    }
        } else {break;}
        }
        fclose(rptr);
        return 0;
}


int main(/*int argc, char *argv[]*/) {
    
        int num_req_threads;
        int num_res_threads;
        int rc1;
        int rc2;    

    sem_init(&sem_mutex, 1, 1);
        sem_init(&sem_full, 1, 0);
        sem_init(&sem_empty, 1, MAXSIZE);

    
    //instance of shared queue struct
        struct queue q;
        init(&q);
    

    //instance of arguments struct
    struct arguments args;

    args.q = &q;
    args.input = "/home/user/Desktop/PA3/input/names1.txt";//argv[5];
    args.reqLog = "/home/user/Desktop/PA3/serviced.txt";//argv[3];
    args.resLog = "/home/user/Desktop/PA3/results.txt"; //argv[4];
    args.poisonPill = "You shall not Pass\n";
    args.numInputFiles = 1;;

    num_req_threads = 1;
    num_res_threads = 1;

    if (num_req_threads > MAX_REQUESTER_THREADS) {
        printf("Cannot have more than 10 requester threads!\n"); exit(1);}
    if (num_res_threads > MAX_RESOLVER_THREADS) {
        printf("Cannot have more than 5 requester threads!\n"); exit(1);}

    //Thread Creation
    pthread_t reqThreads[num_req_threads];
    pthread_t resThreads[num_res_threads];
    for (int j = 0; j < num_res_threads; j++) {
                rc2 = pthread_create(&resThreads[j], NULL, resolverThread, (void *) &args);
                printf("Resolver thread %d created.\n", j);
        }
    for(int i = 0; i < num_req_threads; i++) {
        rc1 = pthread_create(&reqThreads[i], NULL, requesterThread, (void *)&args);
        printf("Requester thread %d created.\n", i);
    }
    /*for (int j = 0; j < num_res_threads; j++) {
                rc2 = pthread_create(&resThreads[j], NULL, resolverThread, (void *) &args);
                printf("Resolver thread %d created.\n", j);
        }*/
    if(rc1 || rc2) {
        printf("Could not create threads.\n");
            exit(-1);
    }
    for (int n = 0; n < num_res_threads; n++) {
                pthread_join(resThreads[n], 0);
    for (int m = 0; m < num_req_threads; m++) {
        pthread_join(reqThreads[m], 0);
    }
    /*for (int n = 0; n < num_res_threads; n++) {
                pthread_join(resThreads[n], 0);*/
        }
    sem_destroy(&sem_mutex);
    sem_destroy(&sem_full);
    sem_destroy(&sem_empty);
    pthread_mutex_destroy(&q.lock);
    return 0;
}

Out put looks like

>> valgrind ./mult -v

==29127== Memcheck, a memory error detector
==29127== Copyright (C) 2002-2015, and GNU GPL'd, by Julian Seward et al.
==29127== Using Valgrind-3.11.0 and LibVEX; rerun with -h for copyright info
==29127== Command: ./mult -v
==29127==
Resolver thread 0 created.
Requester thread 0 created.
Hostname Logged: facebook.com
Hostname Logged: youtube.com
Hostname Logged: yahoo.com
Hostname Logged: live.com
Hostname Logged: wikipedia.org
Hostname Logged: msn.com
Hostname Logged: baidu.com
Hostname Logged: blogspot.com
Hostname Logged: microsoft.com
Hostname Logged: qq.com
Hostname Logged: taobao.com
Hostname Logged: bing.com
Hostname Logged: sina.com.cn
Hostname Logged: soso.com
Hostname Logged: 163.com
Hostname Logged: ask.com
Hostname Logged: adobe.com
Hostname Logged: twitter.com
Hostname Logged: mozilla.com
Hostname Logged: youku.com
Hostname Logged: sdjjdsaf.com
Hostname Logged: andysayler.com
Hostname Logged: wmfo.org
Error looking up Address: Name or service not known
Bogus hostname.
^C
>>GETS CAUGHT HERE

==29127==
==29127== Process terminating with default action of signal 2 (SIGINT)
==29127==    at 0x4E4298D: pthread_join (pthread_join.c:90)
==29127==    by 0x4019E4: main (in /home/user/Desktop/PA3/mult)
==29127==
==29127== HEAP SUMMARY:
==29127==     in use at exit: 1,208 bytes in 4 blocks
==29127==   total heap usage: 371 allocs, 367 frees, 1,776,506 bytes allocated
==29127==
==29127== LEAK SUMMARY:
==29127==    definitely lost: 0 bytes in 0 blocks
==29127==    indirectly lost: 0 bytes in 0 blocks
==29127==      possibly lost: 544 bytes in 2 blocks
==29127==    still reachable: 664 bytes in 2 blocks
==29127==         suppressed: 0 bytes in 0 blocks
==29127== Rerun with --leak-check=full to see details of leaked memory
==29127==
==29127== For counts of detected and suppressed errors, rerun with: -v
==29127== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
Suraj Rao
  • 29,388
  • 11
  • 94
  • 103
Bach
  • 25
  • 4
  • Does this answer your question? [understanding of pthread\_cond\_wait() and pthread\_cond\_signal()](https://stackoverflow.com/questions/16522858/understanding-of-pthread-cond-wait-and-pthread-cond-signal) – Den-Jason Nov 01 '20 at 13:46
  • Looking again at this, I don't follow the logic. Why not have the main thread simply read the file and push all of the queries to a "request" queue, and then wait on another "response" queue for the results? The threads would then pop an entry from the "request" queue, perform the look-up and push the result on the "response" queue. You shouldn't need to be using semaphores all over the place. Also if a queue is "full", don't block; the pusher can back-off and retry. The condition variable is only then needed to "wait for something to be put in the queue if empty", more efficient than polling. – Den-Jason Nov 01 '20 at 14:04
  • Also I would consider having the main thread perform the memory grab for a "request-response" struct which will house both request and response. The queues can then simply be used to pass the request-response objects around from producer to consumer. – Den-Jason Nov 01 '20 at 14:16
  • 3
    Please ask a different question instead of changing your existing one. Doing so might invalidate existing answers – Suraj Rao Nov 24 '20 at 18:46

1 Answers1

1

I suspect the cause is a race condition and/or undefined behaviour.

Both push() and pop() mutate the linked list - but they are not using the same mutex to protect access, and thus may both be running concurrently.

void pop(struct queue *q)
{
    pthread_mutex_lock(&q->lock);

and

void push(struct queue *q, char *name)
{
    pthread_mutex_lock(&q->lock2);

push() in cases where the queue is full holds lock2, then uses q->lock to wait on the condition variable until such a time as the queue is not full.

    while (q->size == MAXSIZE) {
        pthread_cond_wait(&q->cond, &q->lock);
    }

A precondition on the call to pthread_cond_timedwait() is that the mutex is already locked, but I can't see why it would be here.

From the man page

The pthread_cond_timedwait() and pthread_cond_wait() functions shall block on a condition variable. They shall be called with mutex locked by the calling thread or undefined behavior results.

Any error from pthread_cond_wait() is ignored. It may be prudent to check this.

As an aside, in push() all paths call malloc()- which has potential to block. You'd be better calling this before locking the mutex.

marko
  • 9,029
  • 4
  • 30
  • 46
  • I was just messing around with that, but even when I use the same lock I run into this infinite loop – Bach Nov 01 '20 at 01:38