1

First thing, thanks for reading and trying to help!

In the following code it is meant to create 5 Worker Processes and be listening (read) some information from a unnamed pipes that is passed as an argument (that means the unnamed pipe is fixed to that Worker Process), the program also creates a single threat named Dispatcher, the thread is meant to send information (write) to all unnamed pipes so the Worker Process can read the information and print it out. The unnamed pipes are stored in a array dispatcher_pipes.

First I wrote information into the unnamed pipes in the main function and it worked, the Worker Process printed the information I wrote in the unnamed pipes, but when I tried to implement that function to the dispacher threat it didnt work.

Here is the current code that doesnt work

NOTE: I compile the code in the linux terminal with gcc

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>



typedef struct msgbuf {
    long mtype;      // if 0 it did not pass in no process, 1 - worker, 2 - alert watcher
    char mtext[100];
}msgbuf;

int n_workers = 5;
pid_t *pid_workers;




void* dispatcher(void *args) {

    int (*dispatcher_pipe)[2] = (int (*)[2])args;

    pthread_t id=pthread_self();

    printf("Dispatcher thread started.\n");

    msgbuf send_msg;
    send_msg.mtype = 0;
    strcpy(send_msg.mtext, "Message from Dispatcher");

    for (int i = 0; i < n_workers; i++) {
        close(dispatcher_pipe[i][0]); // Close the read end of the pipe
        write(dispatcher_pipe[i][1], &send_msg, sizeof(msgbuf));
        close(dispatcher_pipe[i][1]); // Close the write end of the pipe
    }


    usleep(10); // simulates the processing of the order
    printf("Thread %ld: Worker  finishing\n",id);
    pthread_exit(NULL);


}

void Worker(int *worker_pipe){

    // Register the signal handler for SIGUSR1
    //signal(SIGUSR1, sigint);

    printf("Worker Process (PID=%d): starting!\n",getpid());
    //printf("child getppid() :%ld\n", (long)getppid());
    //sleep(5);


    //Getting message from Dispatcher

    close(worker_pipe[1]); // Close the write end of the pipe

    msgbuf rec_msg;
    read(worker_pipe[0], rec_msg.mtext, sizeof(msgbuf));

    printf("Worker message received from Dispatcher: %s\n", rec_msg.mtext);

    close(worker_pipe[0]); // Close the read end of the pipe

    sleep(1);

    //this exit is only here for debugging purpose as well on the other processes, change it to kill like
    printf("\nWorker process %d has died.\n", getpid());

    exit(0);

}

void main(){


    //variables for the threads
    //pthread_t system_threads[3];
    pthread_t console_thread, sensor_thread, dispatcher_thread;

    //variable for unnamed pipes
    int dispatcher_pipe[n_workers][2];


    pid_workers= malloc(n_workers * sizeof(pid_t));

    //Creates the unnamed pipes for dispatcher

    for(int i=0;i<n_workers;i++){

        if(pipe(dispatcher_pipe[i]) == -1){
            printf("Error creating dispatcher pipe n1\n");
            exit(0);
        }
    }


    //Creating Worker Processes

    for(int i=0;i<n_workers;i++){

        pid_workers[i] = fork();

        if (pid_workers[i] == 0) {
            //Child process
            //printf("YO IM A CHILD\n");
            Worker(dispatcher_pipe[i]);
            //printf("Hey\n");


        } else if (pid_workers[i] > 0) {
            //Parent process
            //printf("YO IM A FADER\n");


        } else {
            //Error forking
            perror("fork");
            exit(EXIT_FAILURE);
        }

    }

    //Creating Threads
    pthread_create(&dispatcher_thread, NULL, dispatcher, dispatcher_pipe);   // ATENCAO arg fica NULL?

    //Wait for the threads to finish
    pthread_join(dispatcher_thread, NULL);

}

Output

Here is the code that worked without the unnamed pipes beind written in the dispatcher thread

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>



typedef struct msgbuf {
    long mtype;      
    char mtext[100];
}msgbuf;

int n_workers = 5;
pid_t *pid_workers;


void* dispatcher(void *args) {

    int (*dispatcher_pipe)[2] = (int (*)[2])args;

    pthread_t id=pthread_self();

    printf("Dispatcher thread started.\n");

    for(int i=0;i<n_workers;i++){
        close(dispatcher_pipe[i][0]); // Close the read end of the pipe

        msgbuf send_msg;
        send_msg.mtype = 1;
        snprintf(send_msg.mtext, sizeof(send_msg.mtext), "Message from SystemManager to Worker %d", i);

        write(dispatcher_pipe[i][1], send_msg.mtext, sizeof(send_msg.mtext));

        close(dispatcher_pipe[i][1]); // Close the write end of the pipe

    }


    usleep(10); // simulates the processing of the order
    printf("Thread %ld: Worker  finishing\n",id);
    pthread_exit(NULL);


}

void Worker(int *worker_pipe){

    // Register the signal handler for SIGUSR1
    //signal(SIGUSR1, sigint);

    printf("Worker Process (PID=%d): starting!\n",getpid());
    //printf("child getppid() :%ld\n", (long)getppid());
    //sleep(5);


    //Getting message from Dispatcher

    close(worker_pipe[1]); // Close the write end of the pipe

    msgbuf rec_msg;
    read(worker_pipe[0], rec_msg.mtext, sizeof(msgbuf));

    printf("Worker message received from Dispatcher: %s\n", rec_msg.mtext);

    close(worker_pipe[0]); // Close the read end of the pipe





    sleep(1);

    //this exit is only here for debugging purpose as well on the other processes, change it to kill like
    printf("\nWorker process %d has died.\n", getpid());

    exit(0);

}

void main(){


    //variables for the threads
    //pthread_t system_threads[3];
    pthread_t console_thread, sensor_thread, dispatcher_thread;

    //variable for unnamed pipes
    int dispatcher_pipe[n_workers][2];


    pid_workers= malloc(n_workers * sizeof(pid_t));

    //Creates the unnamed pipes for dispatcher

    for(int i=0;i<n_workers;i++){

        if(pipe(dispatcher_pipe[i]) == -1){
            printf("Error creating dispatcher pipe n1\n");
            exit(0);
        }
    }


    //Creating Worker Processes

    for(int i=0;i<n_workers;i++){

        pid_workers[i] = fork();

        if (pid_workers[i] == 0) {
            //Child process
            //printf("YO IM A CHILD\n");
            Worker(dispatcher_pipe[i]);
            //printf("Hey\n");


        } else if (pid_workers[i] > 0) {
            //Parent process
            //printf("YO IM A FADER\n");


        } else {
            //Error forking
            perror("fork");
            exit(EXIT_FAILURE);
        }


    }
}


Output

  • 1
    Your issue is that you're creating _all_ your pipes _before_ you do the loop that does `fork`. This means children hold open pipe ends that don't belong to them, so they won't see EOF on the pipes. Move `pipe` call into the `fork` loop and `int dispatcher_pipe[n_workers][2];` --> `int dispatcher_pipe[2];` For a similar issue, see my answer: [Piping between several processes in C](https://stackoverflow.com/a/75431223/5382650). In that answer, there is a link to an answer of mine that shows how to do it correctly: [fd leak, custom Shell](https://stackoverflow.com/a/52825582/5382650) – Craig Estey May 03 '23 at 01:45
  • 1
    Thanks for the help! I understand now my error and the program is now working as I intended. Thanks again! – Duarte Neves May 03 '23 at 13:30

1 Answers1

0

As explained by Craig Estey in the comments:

Your issue is that you're creating all your pipes before you do the loop that does fork. This means children hold open pipe ends that don't belong to them, so they won't see EOF on the pipes. Move pipe call into the fork loop and int dispatcher_pipe[n_workers][2]; --> int dispatcher_pipe[2];

For a similar issue, see my answer: Piping between several processes in C. In that answer, there is a link to an answer of mine that shows how to do it correctly: fd leak, custom Shell

I solved the problem with this updated code:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

typedef struct msgbuf {
    long mtype;      // if 0 it did not pass in no process, 1 - worker, 2 - alert watcher
    char mtext[100];
}msgbuf;

int n_workers = 5;
pid_t *pid_workers;

void* dispatcher(void *args) {

    int (*dispatcher_pipe)[2] = (int (*)[2])args;

    pthread_t id=pthread_self();

    printf("Dispatcher thread started.\n");

    msgbuf send_msg;
    send_msg.mtype = 0;
    strcpy(send_msg.mtext, "Message from Dispatcher");

    for (int i = 0; i < n_workers; i++) {
        close(dispatcher_pipe[i][0]); // Close the read end of the pipe
        write(dispatcher_pipe[i][1], &send_msg, sizeof(msgbuf));
        close(dispatcher_pipe[i][1]); // Close the write end of the pipe
    }

    usleep(10); // simulates the processing of the order
    printf("Thread %ld: Worker finishing\n",id);
    pthread_exit(NULL);
}

void Worker(int *worker_pipe){

    printf("Worker Process (PID=%d): starting!\n",getpid());

    //Getting message from Dispatcher
    close(worker_pipe[1]); // Close the write end of the pipe

    msgbuf rec_msg;
    read(worker_pipe[0], &rec_msg, sizeof(msgbuf));

    printf("Worker message received from Dispatcher: %s\n", rec_msg.mtext);

    close(worker_pipe[0]); // Close the read end of the pipe

    sleep(1);

    printf("\nWorker process %d has died.\n", getpid());

    exit(0);
}

void main(){

    pthread_t dispatcher_thread;

    int dispatcher_pipe[n_workers][2];

    pid_workers = malloc(n_workers * sizeof(pid_t));

    // Creating Worker Processes
    for(int i = 0; i < n_workers; i++){

        // Create the pipe inside the loop
        if(pipe(dispatcher_pipe[i]) == -1){
            printf("Error creating dispatcher pipe n1\n");
            exit(0);
        }

        pid_workers[i] = fork();

        if (pid_workers[i] == 0) {
            // Child process
            Worker(dispatcher_pipe[i]);

        } else if (pid_workers[i] > 0) {
            // Parent process

        } else {
            // Error forking
            perror("fork");
            exit(EXIT_FAILURE);
        }
    }

    // Creating Threads
    pthread_create(&dispatcher_thread, NULL, dispatcher, dispatcher_pipe);

    // Wait for the threads to finish
    pthread_join(dispatcher_thread, NULL);
}
tdy
  • 36,675
  • 19
  • 86
  • 83