UPDATE: I cant figure out why I am getting caught in an infinite loop. I can resolve the error by putting an exit condition when the queue reaches max size, but I want to do it by keeping a file counter (fileCounter) that exits the while loop when the code has handled all of the input files.
Earlier today I had the code running correctly, but when implementing some changes to try and handle multiple threads (more than one producer and one consumer) I seem to have broken it along the way. I know I should have had a copy somewhere before I messed with it, but I thought it would be easy to retrace my steps. My ultimate goal is to make it safe for several producer and consumer threads. I include the header and .c file below.
My shared buffer is a queue and the first three functions in my .c file are dedicated to initializing, pushing to and popping from the queue. I use a mutex lock 'lock' and a condition variable cond to manage when producer threads push to the queue vs. wait and when consumer threads pop from the queue. In my requesterThread and resolverThread function I use three semaphores (sem_mutex, sem_full, sem_empty)--one binary and two counting--that keep track of how many slots in my queue are empty and full. I have to manage three shared resources eventually, but with one producer thread and one consumer thread, the only shared resource is the queue.
Any help with this problem and perhaps some tips on how to work on making my code safe for several threads would be greatly appreciated.
Thank you
.h file
#ifndef MULTILOOKUP_H_
#define MULTILOOKUP_H_
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <sys/time.h>
#include "/home/user/Desktop/PA3/util.c"
#define MAX_REQUESTER_THREADS 10
#define MAX_RESOLVER_THREADS 5
#define MAX_INPUT_FILES 10
#define MAXSIZE 20
struct arguments {
struct queue *q;
char *input;
char *resLog;
char *reqLog;
char line[100];
char result[100];
char ipv6[100];
char *poisonPill;
int numInputFiles;
};
struct node {
char name[100];
struct node *next;
};
struct queue {
int size;
struct node *head;
struct node *tail;
pthread_mutex_t lock, lock2;
pthread_cond_t cond;
};
void init(struct queue *);
void pop(struct queue *);
void push(struct queue *, char *);
void* requesterThread(void *);
void* resolverThread(void *);
sem_t sem_mutex, sem_full, sem_empty;
#endif
.c file
#include "multilookup.h"
/*----------------------Struct Functions------------------------*/
void init(struct queue *q) {
q->head = NULL;
q->tail = NULL;
q->size = 0;
pthread_mutex_init(&q->lock, NULL);
pthread_mutex_init(&q->lock2, NULL);
pthread_cond_init(&q->cond, NULL);
}
void pop(struct queue *q)
{
pthread_mutex_lock(&q->lock);
if (q->size > 0) {pthread_cond_broadcast(&q->cond);}
q->size--;
//printf("%s\n", q->head->name);
struct node *tmp = q->head;
q->head = q->head->next;
free(tmp);
pthread_mutex_unlock(&q->lock);
}
void push(struct queue *q, char *name)
{
pthread_mutex_lock(&q->lock);
while (q->size == MAXSIZE) {
pthread_cond_wait(&q->cond, &q->lock);
}
q->size++;
if (q->head == NULL) {
q->head = (struct node *)malloc(sizeof(struct node));
//q->head->name = name;
strcpy(q->head->name, name);
q->head->next == NULL;
q->tail = q->head;
} else {
q->tail->next = (struct node *)malloc(sizeof(struct node));
//q->tail->next->name = name;
strcpy(q->tail->next->name, name);
q->tail->next->next = NULL;
q->tail = q->tail->next;
}
pthread_mutex_unlock(&q->lock);
}
/*--------------------------End of struct functions------------------*/
void* requesterThread(void *receivedStruct) {
struct arguments *args_ptr;
args_ptr = (struct arguments*) receivedStruct;
FILE *fptr;
FILE *sptr;
int fileCounter = 0;
//Check for proper file paths
if ((fptr = fopen(args_ptr->input, "r")) == NULL) {
fprintf(stderr, "Error! Bogus input file path.\n");
// Thread exits if file pointer returns NULL.
exit(1);
}
if ((sptr = fopen(args_ptr->reqLog, "w")) == NULL) {
fprintf(stderr, "Error! Bogues output path.\n");
exit(1);
}
//Read from input file and push to shared queue
while (1) {
while (fscanf(fptr,"%[^\n]%*c", args_ptr->line) != EOF) {
sem_wait(&sem_empty);
sem_wait(&sem_mutex);
push(args_ptr->q, args_ptr->line);
sem_post(&sem_mutex);
sem_post(&sem_full);
//Update requesterLog and print logged hostnames
fprintf(sptr, "%s \n", args_ptr->line);
printf("Hostname Logged: %s\n", args_ptr->line);
/*LINE TO WRITE "Thread <id> serviced ## files" TO serviced.txt
fprintf(sptr, "Thread %d serviced %d files", pthread_self(), fileCounter);
*/
}
fileCounter++;
//Condition to send poisonPill to consumer
if(fileCounter == args_ptr->numInputFiles) {
push(args_ptr->q, args_ptr->poisonPill);
break;
}
}
fclose(fptr);
fclose(sptr);
return 0;
}
void* resolverThread(void *receivedStruct) {
struct arguments *args_ptr;
args_ptr = (struct arguments *) receivedStruct;
FILE *rptr;
if( (rptr = fopen(args_ptr->resLog, "w")) == NULL) {
fprintf(stderr, "Error! Bogus output file path.\n");
exit(1);
}
while(1) {
sem_wait(&sem_full);
sem_wait(&sem_mutex);
strcpy(args_ptr->result, args_ptr->q->head->name);
pop(args_ptr->q);
sem_post(&sem_mutex);
sem_post(&sem_empty);
if (args_ptr->result != args_ptr->poisonPill) {
if (dnslookup(args_ptr->result, args_ptr->ipv6, INET6_ADDRSTRLEN) == -1){
fprintf(stderr, "Bogus hostname.\n");
fprintf(rptr, "%s, \n", args_ptr->result);
} else {
fprintf(rptr, "%s, %s\n", args_ptr->result, args_ptr->ipv6);
}
} else {break;}
}
fclose(rptr);
return 0;
}
int main(/*int argc, char *argv[]*/) {
int num_req_threads;
int num_res_threads;
int rc1;
int rc2;
sem_init(&sem_mutex, 1, 1);
sem_init(&sem_full, 1, 0);
sem_init(&sem_empty, 1, MAXSIZE);
//instance of shared queue struct
struct queue q;
init(&q);
//instance of arguments struct
struct arguments args;
args.q = &q;
args.input = "/home/user/Desktop/PA3/input/names1.txt";//argv[5];
args.reqLog = "/home/user/Desktop/PA3/serviced.txt";//argv[3];
args.resLog = "/home/user/Desktop/PA3/results.txt"; //argv[4];
args.poisonPill = "You shall not Pass\n";
args.numInputFiles = 1;;
num_req_threads = 1;
num_res_threads = 1;
if (num_req_threads > MAX_REQUESTER_THREADS) {
printf("Cannot have more than 10 requester threads!\n"); exit(1);}
if (num_res_threads > MAX_RESOLVER_THREADS) {
printf("Cannot have more than 5 requester threads!\n"); exit(1);}
//Thread Creation
pthread_t reqThreads[num_req_threads];
pthread_t resThreads[num_res_threads];
for (int j = 0; j < num_res_threads; j++) {
rc2 = pthread_create(&resThreads[j], NULL, resolverThread, (void *) &args);
printf("Resolver thread %d created.\n", j);
}
for(int i = 0; i < num_req_threads; i++) {
rc1 = pthread_create(&reqThreads[i], NULL, requesterThread, (void *)&args);
printf("Requester thread %d created.\n", i);
}
/*for (int j = 0; j < num_res_threads; j++) {
rc2 = pthread_create(&resThreads[j], NULL, resolverThread, (void *) &args);
printf("Resolver thread %d created.\n", j);
}*/
if(rc1 || rc2) {
printf("Could not create threads.\n");
exit(-1);
}
for (int n = 0; n < num_res_threads; n++) {
pthread_join(resThreads[n], 0);
for (int m = 0; m < num_req_threads; m++) {
pthread_join(reqThreads[m], 0);
}
/*for (int n = 0; n < num_res_threads; n++) {
pthread_join(resThreads[n], 0);*/
}
sem_destroy(&sem_mutex);
sem_destroy(&sem_full);
sem_destroy(&sem_empty);
pthread_mutex_destroy(&q.lock);
return 0;
}
Out put looks like
>> valgrind ./mult -v
==29127== Memcheck, a memory error detector
==29127== Copyright (C) 2002-2015, and GNU GPL'd, by Julian Seward et al.
==29127== Using Valgrind-3.11.0 and LibVEX; rerun with -h for copyright info
==29127== Command: ./mult -v
==29127==
Resolver thread 0 created.
Requester thread 0 created.
Hostname Logged: facebook.com
Hostname Logged: youtube.com
Hostname Logged: yahoo.com
Hostname Logged: live.com
Hostname Logged: wikipedia.org
Hostname Logged: msn.com
Hostname Logged: baidu.com
Hostname Logged: blogspot.com
Hostname Logged: microsoft.com
Hostname Logged: qq.com
Hostname Logged: taobao.com
Hostname Logged: bing.com
Hostname Logged: sina.com.cn
Hostname Logged: soso.com
Hostname Logged: 163.com
Hostname Logged: ask.com
Hostname Logged: adobe.com
Hostname Logged: twitter.com
Hostname Logged: mozilla.com
Hostname Logged: youku.com
Hostname Logged: sdjjdsaf.com
Hostname Logged: andysayler.com
Hostname Logged: wmfo.org
Error looking up Address: Name or service not known
Bogus hostname.
^C
>>GETS CAUGHT HERE
==29127==
==29127== Process terminating with default action of signal 2 (SIGINT)
==29127== at 0x4E4298D: pthread_join (pthread_join.c:90)
==29127== by 0x4019E4: main (in /home/user/Desktop/PA3/mult)
==29127==
==29127== HEAP SUMMARY:
==29127== in use at exit: 1,208 bytes in 4 blocks
==29127== total heap usage: 371 allocs, 367 frees, 1,776,506 bytes allocated
==29127==
==29127== LEAK SUMMARY:
==29127== definitely lost: 0 bytes in 0 blocks
==29127== indirectly lost: 0 bytes in 0 blocks
==29127== possibly lost: 544 bytes in 2 blocks
==29127== still reachable: 664 bytes in 2 blocks
==29127== suppressed: 0 bytes in 0 blocks
==29127== Rerun with --leak-check=full to see details of leaked memory
==29127==
==29127== For counts of detected and suppressed errors, rerun with: -v
==29127== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)