I am trying to create a multithreaded pipeline that stores a function in multiple threads and uses pipes to communicate with each thread and function. when i run my program it runs the same function over and over again instead of an individual function i think my pipes have issues but i'm not too sure what exactly am i doing wrong?
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>
int num_ints;
typedef void (*Function)(void* input, void* output);
typedef struct Pipeline {
Function* functions;
int numStages;
} Pipeline;
Pipeline* new_Pipeline() {
Pipeline *this = malloc(sizeof(Pipeline));
this->functions = NULL;
this->numStages = 0;
printf("created the pipeline\n");
return this;
}
bool Pipeline_add(Pipeline* this, Function f) {
//reallocating memory to add a stage to the functions array
this->functions = realloc(this->functions, (this->numStages +1) * sizeof(Function));
if (this->functions == NULL) {
return false;
}
else {
this->functions[this->numStages] = f;
this->numStages++;
printf("added a stage\n");
return true;
}
}
typedef struct {
Function function;
int inputPipe;
int outputPipe;
} thread_args;
void* thread_func(void* arg) {
thread_args *data = (thread_args*) arg;
//get the input and output pipes from the args parameter
int inPipe = data->inputPipe;
int outPipe = data->outputPipe;
data->function((void*)&inPipe,(void*)&outPipe);
return NULL;
}
void Pipeline_execute(Pipeline* this) {
//create threads
pthread_t threads[this->numStages];
thread_args args[this->numStages];
for (int i = 0; i < this->numStages; i++) {
//creating the pipes
int fd[2];
pipe(fd);
//creating input and output pipes for each stage
args[i].function = this->functions[i];
args[i].inputPipe = fd[0];
args[i].outputPipe = fd[1];
if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
printf("created a thread\n");
perror("pthread_create\n");
}
if (i == this->numStages -1) {
close(fd[0]);
}
}
//waiting for threads to finish
for (int i = 0; i < this->numStages; i++) {
if (pthread_join(threads[i], NULL) != 0) {
perror("pthread_join\n");
exit(1);
}
}
//closing pipes
for (int i = 0; i < this->numStages -1; i++) {
}
}
void Pipeline_free(Pipeline* this) {
free(this->functions);
free(this);
}
bool Pipeline_send(void* channel, void* buffer, size_t size) {
if ((write(*(int*)channel, buffer, size)) != -1) {
return true;
} else {
return false;
}
}
bool Pipeline_receive(void* channel, void* buffer, size_t size) {
if ((read(*(int*)channel, buffer, size)) != -1) {
return true;
} else {
return false;
}
}
//an application created to help test the implementation of pipes.
static void generateInts(void* input, void* output) {
printf("generateInts: thread %p\n", (void*) pthread_self());
for (int i = 1; i <= num_ints; i++) {
if (!Pipeline_send(output, (void*) &i, sizeof(int))) exit(EXIT_FAILURE);
}
}
static void squareInts(void* input, void* output) {
printf("squareInts: thread %p\n", (void*) pthread_self());
for (int i = 1; i <= num_ints; i++) {
int number;
if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit(EXIT_FAILURE);
int result = number * number;
if (!Pipeline_send(output, (void*) &result, sizeof(int))) exit(EXIT_FAILURE);
}
}
static void sumIntsAndPrint(void* input, void* output) {
printf("sumIntsAndPrint: thread %p\n", (void*) pthread_self());
int number = 0;
int result = 0;
for (int i = 1; i <= num_ints; i++) {
if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit (EXIT_FAILURE);
result += number;
}
printf("sumIntsAndPrint: result = %i\n", result);
}
static void cleanupExit(Pipeline *p) {
if (p) {
Pipeline_free(p);
}
exit(EXIT_FAILURE);
}
int main() {
scanf("%d", &num_ints);
printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.\n", num_ints);
Pipeline *p = new_Pipeline();
if (p == NULL) cleanupExit(p);
if (!Pipeline_add(p, generateInts)) cleanupExit(p);
if (!Pipeline_add(p, squareInts)) cleanupExit(p);
if (!Pipeline_add(p, sumIntsAndPrint)) cleanupExit(p);
Pipeline_execute(p);
Pipeline_free(p);
return 0;
}
i was expecting it to run each function on a different thread i used another piece of code to debug it and it created three different threads but it was the same function for every thread. when i ran the application provided to check for the pipeline implementation it returned this to me
Setting up pipeline to calculate the sum of squares of integers 1 to 10.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x5677640
squareInts: thread 0x5e78640
sumIntsAndPrint: thread 0x6679640
==162384==
==162384== HEAP SUMMARY:
==162384== in use at exit: 584 bytes in 4 blocks
==162384== total heap usage: 9 allocs, 5 frees, 10,096 bytes allocated
==162384==
==162384== LEAK SUMMARY:
==162384== definitely lost: 0 bytes in 0 blocks
==162384== indirectly lost: 0 bytes in 0 blocks
==162384== possibly lost: 544 bytes in 2 blocks
==162384== still reachable: 40 bytes in 2 blocks
i updated the code