1

I am trying to create a multithreaded pipeline that stores a function in multiple threads and uses pipes to communicate with each thread and function. when i run my program it runs the same function over and over again instead of an individual function i think my pipes have issues but i'm not too sure what exactly am i doing wrong?

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>
int num_ints;
typedef void (*Function)(void* input, void* output);

typedef struct Pipeline {
    Function* functions;
    int numStages;
} Pipeline;
 
Pipeline* new_Pipeline() {
    Pipeline *this = malloc(sizeof(Pipeline));
    this->functions = NULL;
    this->numStages = 0;
    printf("created the pipeline\n");
    return this;   
}

bool Pipeline_add(Pipeline* this, Function f) {
    //reallocating memory to add a stage to the functions array
    this->functions = realloc(this->functions, (this->numStages +1) * sizeof(Function));
    if (this->functions == NULL) {
        return false;
    } 
    else {
        this->functions[this->numStages] = f;
        this->numStages++;
        printf("added a stage\n");
        return true;
    }
    
}

typedef struct {
    Function function;
    int inputPipe;
    int outputPipe;
} thread_args;

void* thread_func(void* arg) {
    thread_args *data = (thread_args*) arg;

    //get the input and output pipes from the args parameter
    int inPipe = data->inputPipe;
    int outPipe = data->outputPipe;
    data->function((void*)&inPipe,(void*)&outPipe);
    return NULL;
}

void Pipeline_execute(Pipeline* this) {
    //create threads 
    pthread_t threads[this->numStages];
    thread_args args[this->numStages];
    for (int i = 0; i < this->numStages; i++) {
         //creating the pipes
        int fd[2]; 
        pipe(fd);
        //creating input and output pipes for each stage
        args[i].function = this->functions[i];
        args[i].inputPipe = fd[0];
        args[i].outputPipe = fd[1];
        if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
            printf("created a thread\n");
            perror("pthread_create\n");
        }
        if (i == this->numStages -1) {
            close(fd[0]);
        }
    }
    //waiting for threads to finish
    for (int i = 0; i < this->numStages; i++) {
        if (pthread_join(threads[i], NULL) != 0) {
            perror("pthread_join\n");
            exit(1);
        }
    }
    //closing pipes
    for (int i = 0; i < this->numStages -1; i++) {
        
    }
    
}


void Pipeline_free(Pipeline* this) {
    free(this->functions);
    free(this);
}


bool Pipeline_send(void* channel, void* buffer, size_t size) {
    if ((write(*(int*)channel, buffer, size)) != -1) {
        return true;
    } else {
        return false;
    }
    
}


bool Pipeline_receive(void* channel, void* buffer, size_t size) {
    if ((read(*(int*)channel, buffer, size)) != -1) {
        return true; 
    } else {
        return false;
    }
}
//an application created to help test the implementation of pipes.

static void generateInts(void* input, void* output) {
    printf("generateInts: thread %p\n", (void*) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_send(output, (void*) &i, sizeof(int))) exit(EXIT_FAILURE);
    }
}


static void squareInts(void* input, void* output) {
    printf("squareInts: thread %p\n", (void*) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        int number;
        if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit(EXIT_FAILURE);
        int result = number * number;
        if (!Pipeline_send(output, (void*) &result, sizeof(int))) exit(EXIT_FAILURE);
    }
}


static void sumIntsAndPrint(void* input, void* output) {
    printf("sumIntsAndPrint: thread %p\n", (void*) pthread_self());
    int number = 0;
    int result = 0;
    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit (EXIT_FAILURE);
        result += number;
    }
    printf("sumIntsAndPrint: result = %i\n", result);
}

static void cleanupExit(Pipeline *p) {
    if (p) {
        Pipeline_free(p);
    }
    exit(EXIT_FAILURE);
}


int main() {
    scanf("%d", &num_ints);
    printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.\n", num_ints);

    Pipeline *p = new_Pipeline();

    if (p == NULL) cleanupExit(p);
    if (!Pipeline_add(p, generateInts)) cleanupExit(p);
    if (!Pipeline_add(p, squareInts)) cleanupExit(p);
    if (!Pipeline_add(p, sumIntsAndPrint)) cleanupExit(p);
    Pipeline_execute(p);

    Pipeline_free(p);
    return 0;
}

i was expecting it to run each function on a different thread i used another piece of code to debug it and it created three different threads but it was the same function for every thread. when i ran the application provided to check for the pipeline implementation it returned this to me

Setting up pipeline to calculate the sum of squares of integers 1 to 10.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x5677640
squareInts: thread 0x5e78640
sumIntsAndPrint: thread 0x6679640
==162384== 
==162384== HEAP SUMMARY:
==162384==     in use at exit: 584 bytes in 4 blocks
==162384==   total heap usage: 9 allocs, 5 frees, 10,096 bytes allocated
==162384==
==162384== LEAK SUMMARY:
==162384==    definitely lost: 0 bytes in 0 blocks
==162384==    indirectly lost: 0 bytes in 0 blocks
==162384==      possibly lost: 544 bytes in 2 blocks
==162384==    still reachable: 40 bytes in 2 blocks

i updated the code

2 Answers2

1

In Pipeline_execute ...

  1. You are using the fd from the pipe call incorrectly.

  2. You are setting the input and output for each thread to the same pipe. So, it writes to itself (creating an infinite data loop?).

  3. You have to "stagger" the pipes so the output of stage 0 goes to the input of stage 1 (e.g.).

Also, your thread function must close the pipe units after returning from the target function.


Here is the refactored [working?] code. In particular, note the changes in Pipeline_execute. Also, the changes in thread_func.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>

#define CLOSEME(_fd) \
    do { \
        if (_fd < 0) \
            break; \
        close(_fd); \
        _fd = -1; \
    } while (0)

int num_ints;
typedef void (*Function) (void *input, void *output);

typedef struct Pipeline {
    Function *functions;
    int numStages;
} Pipeline;

Pipeline *
new_Pipeline()
{
    Pipeline *this = malloc(sizeof(Pipeline));

    this->functions = NULL;
    this->numStages = 0;
    printf("created the pipeline\n");
    return this;
}

bool
Pipeline_add(Pipeline *this, Function f)
{
    // reallocating memory to add a stage to the functions array
    this->functions = realloc(this->functions, (this->numStages + 1) * sizeof(Function));
    if (this->functions == NULL) {
        return false;
    }
    else {
        this->functions[this->numStages] = f;
        this->numStages++;
        printf("added a stage\n");
        return true;
    }

}

typedef struct {
    Function function;
    int inputPipe;
    int outputPipe;
} thread_args;

void *
thread_func(void *arg)
{
    thread_args *data = (thread_args *) arg;

    // get the input and output pipes from the args parameter
    int inPipe = data->inputPipe;
    int outPipe = data->outputPipe;

    data->function((void *) &inPipe, (void *) &outPipe);

    CLOSEME(data->inputPipe);
    CLOSEME(data->outputPipe);

    return NULL;
}

void
Pipeline_execute(Pipeline *this)
{
    // create threads
    pthread_t threads[this->numStages];
    thread_args args[this->numStages];

    int fd[2] = { -1, -1 };

    for (int i = 0; i < this->numStages; i++) {
        // use input side from _prior_ stage for our input
        args[i].inputPipe = fd[0];

        // last stage has _no_ output
        if (i == this->numStages - 1) {
            fd[0] = -1;
            fd[1] = -1;
        }
        else
            pipe(fd);

        // set output for _this_ stage from the pipe output
        args[i].outputPipe = fd[1];

        // creating input and output pipes for each stage
        args[i].function = this->functions[i];
        if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
            printf("created a thread\n");
            perror("pthread_create\n");
        }
    }

    // waiting for threads to finish
    for (int i = 0; i < this->numStages; i++) {
        if (pthread_join(threads[i], NULL) != 0) {
            perror("pthread_join\n");
            exit(1);
        }
    }

    // closing pipes
    // Now done in thread_func
    for (int i = 0; i < this->numStages - 1; i++) {
    }
}

void
Pipeline_free(Pipeline *this)
{
    free(this->functions);
    free(this);
}

bool
Pipeline_send(void *channel, void *buffer, size_t size)
{
    if ((write(*(int *) channel, buffer, size)) != -1) {
        return true;
    }
    else {
        return false;
    }

}

bool
Pipeline_receive(void *channel, void *buffer, size_t size)
{
    if ((read(*(int *) channel, buffer, size)) != -1) {
        return true;
    }
    else {
        return false;
    }
}

//an application created to help test the implementation of pipes.

static void
generateInts(void *input, void *output)
{
    printf("generateInts: thread %p\n", (void *) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_send(output, (void *) &i, sizeof(int)))
            exit(EXIT_FAILURE);
    }
}

static void
squareInts(void *input, void *output)
{
    printf("squareInts: thread %p\n", (void *) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        int number;

        if (!Pipeline_receive(input, (void *) &number, sizeof(int)))
            exit(EXIT_FAILURE);
        int result = number * number;

        if (!Pipeline_send(output, (void *) &result, sizeof(int)))
            exit(EXIT_FAILURE);
    }
}

static void
sumIntsAndPrint(void *input, void *output)
{
    printf("sumIntsAndPrint: thread %p\n", (void *) pthread_self());
    int number = 0;
    int result = 0;

    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_receive(input, (void *) &number, sizeof(int)))
            exit(EXIT_FAILURE);
        result += number;
    }
    printf("sumIntsAndPrint: result = %i\n", result);
}

static void
cleanupExit(Pipeline * p)
{
    if (p) {
        Pipeline_free(p);
    }
    exit(EXIT_FAILURE);
}

int
main()
{
    printf("Enter number of ints:\n");
    scanf("%d", &num_ints);
    printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.\n", num_ints);

    Pipeline *p = new_Pipeline();

    if (p == NULL)
        cleanupExit(p);
    if (!Pipeline_add(p, generateInts))
        cleanupExit(p);
    if (!Pipeline_add(p, squareInts))
        cleanupExit(p);
    if (!Pipeline_add(p, sumIntsAndPrint))
        cleanupExit(p);
    Pipeline_execute(p);

    Pipeline_free(p);
    return 0;
}

Here is the program output:

Enter number of ints:
7
Setting up pipeline to calculate the sum of squares of integers 1 to 7.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x7fc5c10a4700
squareInts: thread 0x7fc5c08a3700
sumIntsAndPrint: thread 0x7fc5c00a2700
sumIntsAndPrint: result = 140

This is based upon my answer: fd leak, custom Shell It is using fork instead of pthread_create but the issues were similar.

Craig Estey
  • 30,627
  • 4
  • 24
  • 48
  • thank you thank you so much i am not used to C so i didn't notice that but thank you very much – posi odukale Apr 10 '23 at 23:57
  • You're welcome! Glad to help. I've been doing multithreaded code and such pipelines since 2004, so I'm happy to pass along some of my knowledge. – Craig Estey Apr 11 '23 at 00:00
0

You are passing this to each of your threads:

        thread_args args = {
            .function = this->functions[i],
            .inputPipe = fd[0],
            .outputPipe = fd[1],
        };

This lives on the stack, and does not persist outside of the loop it is defined in. It may have ceased to exist by the time your thread runs, or multiple threads may end up reading the same values. You're almost certainly passing the same address to all of your threads.

You're doing the equivalent of:

int a;

a = 1;
pthread_create(..., &a);

a = 2;
pthread_create(..., &a);

But there's no guarantee of when any of these threads will be run.

You need to allocate structures for each thread, that last as long as the threads do. Simplest might be to define:

    thread_args args[this->num_stages];

at the top of Pipeline_execute() and populate that.

i.e. you want something like:

    thread_args args[this->num_stages];

    for (int i = 0; i < this->numStages; i++) {
        //creating the pipes
        int fd[2]; 
        pipe(fd);
        //creating input and output pipes for each stage
        args[i].function = this->functions[i],
        args[i].inputPipe = fd[0],
        args[i].outputPipe = fd[1],

        if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
            printf("created a thread\n");
            perror("pthread_create\n");
        }
    ...
    }
pmacfarlane
  • 3,057
  • 1
  • 7
  • 24
  • how would i go about populating the args array and storing it then using it in the pthread_create() method – posi odukale Apr 10 '23 at 22:19
  • Added more detail to show how to do it. – pmacfarlane Apr 10 '23 at 22:28
  • i tried this new method but my code is not producing a result and it also complains about a memory leak? – posi odukale Apr 10 '23 at 22:37
  • i updated the question to show the new output do you know why it''s outputting this now? and once again thank you for helping me solve the issue – posi odukale Apr 10 '23 at 22:44
  • You're not linking your threads together properly. The "out" pipe for one thread needs to be the "in" pipe for the next. Your last thread is writing its output into a pipe which nothing is reading from. I'm out of time to fix these problems. If someone wants to edit my answer (or post a better one) then they should. – pmacfarlane Apr 10 '23 at 23:30
  • My advice would be: get rid of all the unnecessary `(void *)` casts, get rid of all the unnecessary pointer indirection, and add debug logging whenever you call `exit()`. Research what `perror()` does. You'll either figure it out yourself, or you'll have a nicer question to ask here. – pmacfarlane Apr 10 '23 at 23:42
  • i was asked to code to a certain implementation and in the implementation the function method only takes in void* arguments – posi odukale Apr 10 '23 at 23:44