0

I was able to successfully create threads for each function and a pipe for each function as well, i am not quite sure what i am missing in my execute function that makes me lose the result value as it does not get returned at run time. what am i doing wrong?

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>
int num_ints;
typedef void (*Function)(void* input, void* output);

typedef struct Pipeline {
    Function* functions;
    int numStages;
} Pipeline;
 
Pipeline* new_Pipeline() {
    Pipeline *this = malloc(sizeof(Pipeline));
    this->functions = NULL;
    this->numStages = 0;
    printf("created the pipeline\n");
    return this;   
}

bool Pipeline_add(Pipeline* this, Function f) {
    //reallocating memory to add a stage to the functions array
    this->functions = realloc(this->functions, (this->numStages +1) * sizeof(Function));
    if (this->functions == NULL) {
        return false;
    } 
    else {
        this->functions[this->numStages] = f;
        this->numStages++;
        printf("added a stage\n");
        return true;
    }
    
}

typedef struct {
    Function function;
    int inputPipe;
    int outputPipe;
} thread_args;

void* thread_func(void* arg) {
    thread_args *data = (thread_args*) arg;

    //get the input and output pipes from the args parameter
    int inPipe = data->inputPipe;
    int outPipe = data->outputPipe;
    data->function((void*)&inPipe,(void*)&outPipe);
    return NULL;
}

void Pipeline_execute(Pipeline* this) {
    //create threads 
    pthread_t threads[this->numStages];
    thread_args args[this->numStages];
    for (int i = 0; i < this->numStages; i++) {
         //creating the pipes
        int fd[2]; 
        pipe(fd);
        //creating input and output pipes for each stage
        args[i].function = this->functions[i];
        args[i].inputPipe = fd[0];
        args[i].outputPipe = fd[1];
        if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
            printf("created a thread\n");
            perror("pthread_create\n");
        }
        if (i == this->numStages -1) {
            close(fd[0]);
        }
    }
    //waiting for threads to finish
    for (int i = 0; i < this->numStages; i++) {
        if (pthread_join(threads[i], NULL) != 0) {
            perror("pthread_join\n");
            exit(1);
        }
    }
    //closing pipes
    for (int i = 0; i < this->numStages -1; i++) {
        
    }
    
}


void Pipeline_free(Pipeline* this) {
    free(this->functions);
    free(this);
}


bool Pipeline_send(void* channel, void* buffer, size_t size) {
    if ((write(*(int*)channel, buffer, size)) != -1) {
        return true;
    } else {
        return false;
    }
    
}


bool Pipeline_receive(void* channel, void* buffer, size_t size) {
    if ((read(*(int*)channel, buffer, size)) != -1) {
        return true; 
    } else {
        return false;
    }
}
//an application created to help test the implementation of pipes.

static void generateInts(void* input, void* output) {
    printf("generateInts: thread %p\n", (void*) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_send(output, (void*) &i, sizeof(int))) exit(EXIT_FAILURE);
    }
}


static void squareInts(void* input, void* output) {
    printf("squareInts: thread %p\n", (void*) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        int number;
        if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit(EXIT_FAILURE);
        int result = number * number;
        if (!Pipeline_send(output, (void*) &result, sizeof(int))) exit(EXIT_FAILURE);
    }
}


static void sumIntsAndPrint(void* input, void* output) {
    printf("sumIntsAndPrint: thread %p\n", (void*) pthread_self());
    int number = 0;
    int result = 0;
    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit (EXIT_FAILURE);
        result += number;
    }
    printf("sumIntsAndPrint: result = %i\n", result);
}

static void cleanupExit(Pipeline *p) {
    if (p) {
        Pipeline_free(p);
    }
    exit(EXIT_FAILURE);
}


int main() {
    scanf("%d", &num_ints);
    printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.\n", num_ints);

    Pipeline *p = new_Pipeline();

    if (p == NULL) cleanupExit(p);
    if (!Pipeline_add(p, generateInts)) cleanupExit(p);
    if (!Pipeline_add(p, squareInts)) cleanupExit(p);
    if (!Pipeline_add(p, sumIntsAndPrint)) cleanupExit(p);
    Pipeline_execute(p);

    Pipeline_free(p);
    return 0;
}

i am not quite sure of what to do regarding this part this was the output i was getting:

Setting up pipeline to calculate the sum of squares of integers 1 to 10.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x5677640
squareInts: thread 0x5e78640
sumIntsAndPrint: thread 0x6679640
==162384== 
==162384== HEAP SUMMARY:
==162384==     in use at exit: 584 bytes in 4 blocks
==162384==   total heap usage: 9 allocs, 5 frees, 10,096 bytes allocated
==162384==
==162384== LEAK SUMMARY:
==162384==    definitely lost: 0 bytes in 0 blocks
==162384==    indirectly lost: 0 bytes in 0 blocks
==162384==      possibly lost: 544 bytes in 2 blocks
==162384==    still reachable: 40 bytes in 2 blocks

but i am expecting something like:

Setting up pipeline to calculate the sum of squares of integers 1 to 10.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x5677640
squareInts: thread 0x5e78640
sumIntsAndPrint: thread 0x6679640
sumIntsAndPrint: result = 385
==162384== 
==162384== HEAP SUMMARY:
==162384==     in use at exit: 584 bytes in 4 blocks
==162384==   total heap usage: 9 allocs, 5 frees, 10,096 bytes allocated
==162384==
==162384== LEAK SUMMARY:
==162384==    definitely lost: 0 bytes in 0 blocks
==162384==    indirectly lost: 0 bytes in 0 blocks
==162384==      possibly lost: 544 bytes in 2 blocks
==162384==    still reachable: 40 bytes in 2 blocks

when the numInts is equal to 10

0 Answers0