0

The problem: Similar to one of my other Questions Other Question I am trying to create a program in C that allows me to Search through 10 text files with a variable amount of threads to find the largest Prime. It should also have a Manager thread that is allowed to read the Largest Prime number of a worker thread (and not modify it). The Manager thread also Posts the largest Prime number found by all of the worker threads so the worker threads can read it and use it. The worker threads must post their local Largest Prime to a global array (privateLargestPrime) and before they do this they must lock it so that the Manager Thread doesn't read it until the worker thread updates it.

The weird Part: As I step through my program when the worker thread wants to call a lock it switches threads to the manager which calls for a lock and is granted a lock then it keeps looping starving the Worker thread. I am Not sure what is going on with that. If I could get any insight on this problem it will be greatly appreciated.

/*
 * The Reason for Worker Initialization + Manager Initialization is that we need both types of threads to exist at the same time
 * so I just combined them into one loop, although I believe that they could have been created seperatly.
 * Basically just call pthread_Join at the end
 */

    #include <stdio.h>
    #include <stdlib.h>
    #include <math.h>
    #include <pthread.h>
    #include <time.h>
    #include <string.h>
    #include <fileTest.h>

    clock_t Start, End;
    double elapsed = 0;
    pthread_cond_t managerVar;
    pthread_mutex_t mutex;
    unsigned int globalLargestPrime = 0;
    int numThreads = 1;//Number of Threads
    int LINES_PER_THREAD;
    pthread_cond_t *WorkerConditionaVar;
    pthread_cond_t *ManagerConditionaVar;
    unsigned int *privateLocalLargest;//will need to be changed
    int *statusArray;


    FILE *fileOut;
    typedef enum{
      FREE,
      IN_USE
    }lrgstPrm;
    lrgstPrm monLargestPrime;//create enum
    lrgstPrm workerLargestPrime;//create enum

    typedef enum{
      Finished,
      Not_Finished
    }Status;
    Status is_Finished;

    typedef struct threadFields{
      int id;
      int StartPos;//gets seek for worker thread
      int EndPos;
    }tField;



    int ChkPrim(unsigned int n){
      unsigned int i;
      unsigned int root = sqrt(n);
      for(i=2; i<root; i++){
            if(n % i == 0)
              return 0;

         }
      //printf("%d \n", isPrime);
      return 1;
    }

    void *Worker(void *threadStruct){//Create Threads
      struct threadFields *info = threadStruct;
      int index;
      int id = info->id;
      unsigned int currentNum = 0;
      int Seek = info->StartPos;
      unsigned int localLargestPrime = 0;
      char *buffer = malloc(50);
      int isPrime = 0;

        while(Seek<info->EndPos){
        for(index = 0; index < 1000; index++){//Loop 1000 times
        fseek(fileOut,Seek*sizeof(char)*20, SEEK_SET);
        fgets(buffer,20,fileOut);
        Seek++;
        currentNum = atoi(buffer);
        if(currentNum>localLargestPrime && currentNum > 0){
          isPrime = ChkPrim(currentNum);
          if( isPrime == 1)
            localLargestPrime = currentNum;
        }
      }

        //while(monLargestPrime == IN_USE)
        //pthread_cond_wait(&monitor[id], &mutex);//wait untill mutex is unlocked
        //monLargestPrime = IN_USE;
        //Critical Zone
        //printf("Entering Critical Zone My ID: %d\n",id);

        /*Should Lock the Private Largest Prime from any other thread using it*/
        if(pthread_mutex_lock(&mutex) != 0)//Lock
          printf("Failed To Lock");
        while(workerLargestPrime == IN_USE)//Wait untill Workers largest prime is free
          pthread_cond_wait(ManagerConditionaVar, &mutex);
        workerLargestPrime = IN_USE;//Local Largest is in use
        privateLocalLargest[id] = localLargestPrime;//Assign Local Largest to each workers Shared Variable
        workerLargestPrime = FREE;
        pthread_cond_signal(ManagerConditionaVar);//Signal to any waiting thread that wants to touch(read) this workers privateLocalLargest
        pthread_mutex_unlock(&mutex);



        /*
        pthread_mutex_lock(&mutex);
       while(workerLargestPrime == FREE){
         workerLargestPrime = IN_USE;
         //pthread_cond_wait(&managerVar,&mutex);
        */
        if(localLargestPrime < globalLargestPrime)
          localLargestPrime = globalLargestPrime;
        /*
       workerLargestPrime = FREE;
       pthread_mutex_unlock(&mutex);

       // for(index = 0; index < numThreads; index++)
         // if(index != id)
           // pthread_cond_signal(&monitor[id]);//signal all threads that mutex is unlocked
        //monLargestPrime = FREE;
        //printf("Exiting Critical Zone My ID: %d\n",id);
    */
        //pthread_mutex_unlock(&mutex);
       }//End of While
    statusArray[id] = 1;
    void *i = 0;
    return i;
    }

    void *manager(){
        int index, MlocalLargestPrime;


        while(is_Finished==Not_Finished){
        /*Should Lock the Private Largest Prime from any other thread using it*/
            if(pthread_mutex_lock(&mutex) != 0)//Lock
              printf("Failed To Lock");
            while(workerLargestPrime == IN_USE)//Wait untill Workers largest prime is free
              pthread_cond_wait(ManagerConditionaVar, &mutex);
            workerLargestPrime = IN_USE;//Local Largest is in use
            //Critical Zone
            for(index =0; index < numThreads; index++)
                  if(privateLocalLargest[index] > MlocalLargestPrime)
                    MlocalLargestPrime = privateLocalLargest[index];
            //Critical Zone
            workerLargestPrime = FREE;
            pthread_cond_signal(ManagerConditionaVar);//Signal to any waiting thread that wants to touch(read) this workers privateLocalLargest
            pthread_mutex_unlock(&mutex);
    /*
       pthread_mutex_lock(&mutex);
       while(workerLargestPrime == FREE){
         workerLargestPrime = IN_USE;
         globalLargestPrime = MlocalLargestPrime;
         workerLargestPrime = FREE;
         pthread_cond_signal(&managerVar);

       }
       pthread_mutex_unlock(&mutex);
    */
       /*check if workers have finished*/
    for(index = 0; index < numThreads; index++)
      if(statusArray[index] == 0)
        is_Finished = Not_Finished;
    }
        void *i = 0;
        return i;
       }

    int main(){
      //setFile();
      LINES_PER_THREAD = (getLineNum()/numThreads);
      fileOut = fopen("TextFiles/dataBin.txt", "rb");
      Start = clock();
      //pthread_t managerThread;
      pthread_t threads[numThreads];
      pthread_cond_t monitor[numThreads];
      pthread_cond_t managerCon;
      WorkerConditionaVar = monitor;//Global Pointer points to the array created in main
      ManagerConditionaVar = &managerCon;
      unsigned int WorkerSharedVar[numThreads];
      privateLocalLargest = WorkerSharedVar;
      pthread_mutex_init(&mutex, NULL);
      int finishedArr[numThreads];
      statusArray = finishedArr;
      is_Finished = Not_Finished;
      int index;


          /*Worker Initialization + Manager Initialization*/
          pthread_cond_init(&managerCon,NULL);
          /*Worker Thread Struct Initalization*/
          tField *threadFields[numThreads];//sets number of thread structs
          rewind(fileOut);
          for(index = 0; index < numThreads; index++){//run through threads; inizilize the Struct for workers
              pthread_cond_init(&monitor[index], NULL);//Initialize all the conditional variables
              threadFields[index] = malloc(sizeof(tField));
              threadFields[index]->id = index;
              threadFields[index]->StartPos = index*LINES_PER_THREAD;// Get Position for start of block
              threadFields[index]->EndPos = (index+1)*LINES_PER_THREAD-1;// Get Position for end of block
        }
          /*Worker Thread Struct Initalization*/

          for(index = 0; index<numThreads+1; index++)
            if(index == numThreads)//Last Thread is Manager Thread
              pthread_create(&threads[index],NULL,manager,NULL);//Create Manager
            else//Worker Threads
              pthread_create(&threads[index],NULL,Worker,(void *)threadFields[index]);//Pass struct to each worker

          for(index = 0; index<numThreads+1; index++)
            pthread_join(threads[index], NULL);
          /*Worker Initialization + Manager Initialization*/


      /*Destroy the mutexes & conditional signals*/
          for(index = 0; index < numThreads; index++){
             pthread_cond_destroy(&WorkerConditionaVar[index]);
             }
          pthread_cond_destroy(&managerCon);

      pthread_mutex_destroy(&mutex);
    End = clock();
    elapsed = ((double) (End - Start)) / CLOCKS_PER_SEC;

    printf("This is the Time %f\n", elapsed);
    printf("This is the Largest Prime Number: %u", globalLargestPrime);
    return 0;
    }



  [1]: https://stackoverflow.com/questions/13672456/slightly-complicated-thread-synchronization

There is another C source which I only use 1 method and it is to give me the number of lines from the 10 text files, I will also post it (but not neccessary):

/*
 * fileTest.c
 *
 *  Created on: Dec 8, 2012
 *      Author: kevin
 *
 *  count number of lines
 *  divide by number of threads
 *  get the positions to hand to each thread
 *  to get positions, one needs to get the number of lines per thread,
 *  add number of lines to each: Seek*sizeof(char)*10, SEEK_SET.
 *  and hand out these positions to each thread
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <pthread.h>
#include <time.h>
#include <string.h>

  FILE *filesIn[10], *fileOut;
  int Seek;

void createText(){
  FILE *fOUT = fopen("data9.txt", "w");
  int i;
  srand(time(NULL));
  for(i=0; i<10000; i++)
  fprintf(fOUT, "%d\n",rand()%9000);
  fclose(fOUT);
}

void setFile(){
  int index;
  Seek = 0;
  char *buffer = malloc(50);
  filesIn[0] = fopen("TextFiles/primes1.txt", "r");//read Text
  filesIn[1] = fopen("TextFiles/primes2.txt", "r");//read Text
  filesIn[2] = fopen("TextFiles/primes3.txt", "r");//read Text
  filesIn[3] = fopen("TextFiles/primes4.txt", "r");//read Text
  filesIn[4] = fopen("TextFiles/primes5.txt", "r");//read Text
  filesIn[5] = fopen("TextFiles/primes6.txt", "r");//read Text
  filesIn[6] = fopen("TextFiles/primes7.txt", "r");//read Text
  filesIn[7] = fopen("TextFiles/primes8.txt", "r");//read Text
  filesIn[8] = fopen("TextFiles/primes9.txt", "r");//read Text
  filesIn[9] = fopen("TextFiles/primes10.txt", "r");//read Text
  fileOut = fopen("TextFiles/dataBin.txt", "wb");//write in bin

for(index = 0; index < 10; index++)//Run through 10 files
  while(!feof(filesIn[index])){

    fscanf(filesIn[index],"%s", buffer);//take line from input
    fseek(fileOut,Seek*sizeof(char)*20, SEEK_SET);
    fputs(buffer,fileOut);//Print line to output file
    Seek++;
  }
  fclose(filesIn[0]);
  fclose(filesIn[1]);
  fclose(filesIn[2]);
  fclose(filesIn[3]);
  fclose(filesIn[4]);
  fclose(filesIn[5]);
  fclose(filesIn[6]);
  fclose(filesIn[7]);
  fclose(filesIn[8]);
  fclose(filesIn[9]);
  fclose(fileOut);
}

void getFile(){
  int Seek = 0;
  int currentSeek = 0;
  int currentNum = 0;
  int localLargestPrime = 0;
  char *buffer = malloc(50);
  fileOut = fopen("TextFiles/dataBin.txt", "rb");
  rewind(fileOut);
  while(!feof(fileOut)){
  fseek(fileOut,Seek*sizeof(char)*20, SEEK_SET);
  fgets(buffer,10,fileOut);
  Seek++;
  currentNum = atoi(buffer);
  if(currentNum>localLargestPrime)
        if(ChkPrim(currentNum) == 1){
          localLargestPrime = currentNum;
          currentSeek = Seek*sizeof(char)*20;
          printf("the current seek is: %d\n", currentSeek);
        }
  }
  printf("This is the largest Prime: %d\n", localLargestPrime);
}

int getLineNum(){
  Seek = 0;
  int index;
  char c;
  filesIn[0] = fopen("TextFiles/primes1.txt", "r");//read Text
  filesIn[1] = fopen("TextFiles/primes2.txt", "r");//read Text
  filesIn[2] = fopen("TextFiles/primes3.txt", "r");//read Text
  filesIn[3] = fopen("TextFiles/primes4.txt", "r");//read Text
  filesIn[4] = fopen("TextFiles/primes5.txt", "r");//read Text
  filesIn[5] = fopen("TextFiles/primes6.txt", "r");//read Text
  filesIn[6] = fopen("TextFiles/primes7.txt", "r");//read Text
  filesIn[7] = fopen("TextFiles/primes8.txt", "r");//read Text
  filesIn[8] = fopen("TextFiles/primes9.txt", "r");//read Text
  filesIn[9] = fopen("TextFiles/primes10.txt", "r");//read Text
  for(index = 0; index < 10; index++)
    while((c = fgetc(filesIn[index])) != EOF)
      if(c == '\n')
        Seek++;

  return Seek;
}

enter link description here

Community
  • 1
  • 1
Kevin
  • 3,077
  • 6
  • 31
  • 77
  • 1
    Unrelated to your threading problem: `while(!feof(fileOut))` is incorrect. Read more about the "EOF anti-pattern" here: http://stackoverflow.com/questions/5431941 and http://drpaulcarter.com/cs/common-c-errors.php#4.2 – Michael Burr Dec 11 '12 at 22:34
  • Since you know the number of threads you're launching, why have a global single-largest prime *at all* ?? A simple `int largestPrime[n];` where `n` is indexed by thread ensures no thread ever walks on an other. once you finish and join all your threads, just walk the array and get the largest result. Or better still, return it as the void* from the thread proc. – WhozCraig Dec 11 '12 at 23:37

3 Answers3

0

You seem to be overdoing the synchronization of the access to globalLargestPrime. But instead of trying to fix that there might be a better way to communicate each thread's value to the manager - just have the thread function return the value it finds as an unsigned int cast to a void*. Then the manager can collect those values by just waiting on a pthread_join() for each thread to finish.

Something like the following pseudo code:

void *Worker(void *threadStruct)
{
    unsigned int largest_prime;

    // do whatever you need to do to find the largest prime in the set of numbers
    //  this thread has to deal with
    //
    // Note that nothing here should require synchronization, since the data should be
    //  completely independent of other threads

    return (void*) largest_prime;
}

 void *manager()
 {
    unsigned int largest_prime = 0;

    // do whatever to spin up the threads and keep track of them in a
    //  pthread_t[] array...

    // now wait for the threads to finish up and keep deal with the value
    //  each thread has found:
    for each (pthread* p in the pthread_t[]) { // remember - pseudo code
        void* result = 0;

        // get the result that thread found
        pthread_join( p, &result);
        unsigned int thread_prime = (unsigned int) result;

        if (largest_prime < thread_prime) {
            largest_prime = thread_prime;
        }
    }

    printf("largest prime: %u\n", largest_prime);
 }

Now all of your synchronization hassles are dealt with by pthread_join().

Michael Burr
  • 333,147
  • 50
  • 533
  • 760
  • Actually this is interesting, I again Failed to mention that the reason I am doing synchronization in the worker thread is to communicate to the manager thread the workers largest prime so far (after 1000 numbers checked). So I feel like it should be synchronized with the manager thread, the same with when the manager thread updates the global_largest (although if it checks every 1000 if it misses an update it wouldn't be that bad). But I feel like all communication with Manager should be synchronized. – Kevin Dec 11 '12 at 23:32
  • @Kevin: the `pthread_join()` call synchronizes the manager thread with the completion of the worker thread. And by happy coincidence, gives the manager thread the value returned by the completed thread. Everything is still synchronized, just in a much less complex way. – Michael Burr Dec 11 '12 at 23:39
0

Going by your problem, I think you can and should do without locks.

Use the global array to update the Manager thread from the worker threads. Since worker each thread will write to separate array index, there is only one writer per array index. Main thread can keep on reading from the same array.

Use one global variable for Largest prime number found so far (shared across all threads). For this variable, the main thread is the only writer and the worker threads are all readers.

Consistency will not be an issue since its only one variable. You need to worry about taking locks if there are more variables that need to be updated together.

Hope this helps.

Groovy
  • 516
  • 5
  • 16
0

Ok So I was doing something Funky with my conditional variables (I had way too many!) so Here I shall post my answer:

/*
 * The Reason for Worker Initialization + Manager Initialization is that we need both types of threads to exist at the same time
 * so I just combined them into one loop, although I believe that they could have been created seperatly.
 * Basically just call pthread_Join at the end
 */

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <pthread.h>
#include <time.h>
#include <string.h>
#include "fileTest.h"

clock_t Start, End;
double elapsed = 0;
pthread_cond_t managerVar;
pthread_mutex_t mutex;
unsigned int globalLargestPrime = 0;
int *numThreads;//Number of Threads ptr
int LINES_PER_THREAD;
pthread_cond_t *WorkerConditionaVar;
pthread_cond_t *ManagerConditionaVar;
unsigned int *privateLocalLargest;//will need to be changed
int *statusArray;


FILE *fileOut;
typedef enum{
  FREE,
  IN_USE
}lrgstPrm;
lrgstPrm managerLargestPrime;//create enum
lrgstPrm workerLargestPrime;//create enum

typedef enum{
  Finished,
  Not_Finished
}Status;
Status is_Finished;

typedef struct threadFields{
  int id;
  int StartPos;//gets seek for worker thread
  int EndPos;
}tField;



int ChkPrim(unsigned int n){
  unsigned int i;
  unsigned int root = sqrt(n);
  for(i=2; i<root; i++){
        if(n % i == 0)
          return 0;

     }
  //printf("%d \n", isPrime);
  return 1;
}

void *Worker(void *threadStruct){//Create Threads
  struct threadFields *info = threadStruct;
  int index;
  int id = info->id;
  unsigned int currentNum = 0;
  int Seek = info->StartPos;
  unsigned int localLargestPrime = 0;
  char *buffer = malloc(50);
  int isPrime = 0;

    while(Seek<info->EndPos){
    for(index = 0; index < 1000; index++){//Loop 1000 times
    fseek(fileOut,Seek*sizeof(char)*20, SEEK_SET);
    fgets(buffer,20,fileOut);
    Seek++;
    currentNum = atoi(buffer);
    if(currentNum>localLargestPrime && currentNum > 0){
      isPrime = ChkPrim(currentNum);
      if( isPrime == 1)
        localLargestPrime = currentNum;
    }
  }
/*Here is where I block the manager thread read while I Write*/
    pthread_mutex_lock(&mutex);
    while(workerLargestPrime == IN_USE)
      pthread_cond_wait(WorkerConditionaVar,&mutex);
    //Critical Zone
    privateLocalLargest[id] = localLargestPrime;
    //Critical Zone
    pthread_cond_signal(WorkerConditionaVar);
    pthread_mutex_unlock(&mutex);
/*Here is where I block the manager thread read while I Write*/

/*I need to wait here until it is free to read the Managers Shared variable (GlobaLargestPrime)*/
    pthread_mutex_lock(&mutex);
      while(managerLargestPrime == IN_USE)
        pthread_cond_wait(ManagerConditionaVar,&mutex);
      //Critical Zone
    if(localLargestPrime < globalLargestPrime)
      localLargestPrime = globalLargestPrime;
      //Critical Zone
    pthread_cond_signal(ManagerConditionaVar);
    pthread_mutex_unlock(&mutex);
/*I need to wait here until it is free to read the Managers Shared variable (GlobaLargestPrime)*/
   }//End of While
statusArray[id] = 1;
return NULL;
}

void *manager(){
    int index;
    int ManagerLocalLargest = 0;


    while(is_Finished==Not_Finished){
/*I need to wait here until it is free to read the workers Shared variable (PrivateLocalLargest)*/
      pthread_mutex_lock(&mutex);
      while(workerLargestPrime == IN_USE)
        pthread_cond_wait(WorkerConditionaVar,&mutex);
      //Critical Zone
      for(index = 0; index < *numThreads; index++)
        if(privateLocalLargest[index] > ManagerLocalLargest)
          ManagerLocalLargest = privateLocalLargest[index];
      //Critical Zone
      pthread_cond_signal(WorkerConditionaVar);
      pthread_mutex_unlock(&mutex);

/*Here is where I block the worker thread read while I Write*/
      pthread_mutex_lock(&mutex);
            while(managerLargestPrime == IN_USE)
              pthread_cond_wait(ManagerConditionaVar,&mutex);
            //Critical Zone
            for(index = 0; index < *numThreads; index++)
              if(privateLocalLargest[index] > globalLargestPrime)
                globalLargestPrime = privateLocalLargest[index];
            //Critical Zone
            pthread_cond_signal(ManagerConditionaVar);
            pthread_mutex_unlock(&mutex);
/*Here is where I block the worker thread read while I Write*/

   /*check if workers have finished*/
for(index = 0; index < *numThreads; index++){
  is_Finished = Finished;
  if(statusArray[index] != 1){
    is_Finished = Not_Finished;
    }
  }
}
    return NULL;
   }

 int main(int argc, char *argv[]){
  //setFile();
int argument;
switch(argc){

case 1:
  printf("You didn't Type the number of threads you wanted... \n");
  printf("argument format: [# of Threads]\n");
  return -1;
  break;
case 2:
  if(strcmp(argv[1],"--help") == 0){
      printf("argument format: [# of Threads]\n");
      return 0;
  }
  else
    argument = atoi(argv[1]);
  break;
}
  printf("The number of threads is %d\n", argument);
  numThreads = &argument;
  LINES_PER_THREAD = (getLineNum()/(*numThreads));
  fileOut = fopen("TextFiles/dataBin.txt", "rb");
  //pthread_t managerThread;
  pthread_t threads[*numThreads];
  pthread_cond_t monitor[*numThreads];
  pthread_cond_t managerCon;
  WorkerConditionaVar = monitor;//Global Pointer points to the array created in main
  ManagerConditionaVar = &managerCon;
  unsigned int WorkerSharedVar[*numThreads];
  privateLocalLargest = WorkerSharedVar;
  pthread_mutex_init(&mutex, NULL);
  int finishedArr[*numThreads];
  statusArray = finishedArr;
  is_Finished = Not_Finished;
  int index;


      /*Worker Initialization + Manager Initialization*/
      pthread_cond_init(&managerCon,NULL);
      /*Worker Thread Struct Initalization*/
      tField *threadFields[*numThreads];//sets number of thread structs
      rewind(fileOut);
      for(index = 0; index < *numThreads; index++){//run through threads; inizilize the Struct for workers
        privateLocalLargest[index] = 0;
          pthread_cond_init(&monitor[index], NULL);//Initialize all the conditional variables
          threadFields[index] = malloc(sizeof(tField));
          threadFields[index]->id = index;
          threadFields[index]->StartPos = index*LINES_PER_THREAD;// Get Position for start of block
          threadFields[index]->EndPos = (index+1)*LINES_PER_THREAD-1;// Get Position for end of block
    }
      /*Worker Thread Struct Initalization*/
      Start = clock();
      for(index = 0; index<*numThreads+1; index++)
        if(index == *numThreads)//Last Thread is Manager Thread
          pthread_create(&threads[index],NULL,manager,NULL);//Create Manager
        else//Worker Threads
          pthread_create(&threads[index],NULL,Worker,(void *)threadFields[index]);//Pass struct to each worker

      for(index = 0; index<*numThreads+1; index++)
        pthread_join(threads[index], NULL);
      /*Worker Initialization + Manager Initialization*/


  /*Destroy the mutexes & conditional signals*/
      for(index = 0; index < *numThreads; index++){
         pthread_cond_destroy(&WorkerConditionaVar[index]);
         }
      pthread_cond_destroy(&managerCon);

  pthread_mutex_destroy(&mutex);
End = clock();
elapsed = ((double) (End - Start)) / CLOCKS_PER_SEC;

printf("This is the Time %f\n", elapsed);
printf("This is the Largest Prime Number: %u\n", globalLargestPrime);
return 0;
}

Also I solved my problem with the number of threads and Conditional variables needing to be hard-coded in, Now It can just be entered in as a parameter. Thanks everyone for all the support.

PS. I have noticed that having 2 threads does not speed up the process (I assumed it would) and my pc is a dual core. it could be because of the Mutex locks and all of the blocking. I also Noticed that the more threads the longer it takes for them to process the data... hrmm if anyone sees this and can give me some insight please pm me or write a comment. Thanks (the other c file stayed the same).

Kevin
  • 3,077
  • 6
  • 31
  • 77