2

First thing, thanks for reading and trying to help!

This is a multi-process and multi-threaded code in C. It involves creating multiple worker processes and a dispatcher thread that communicates with the workers through unnamed pipes.

Below is a explanation of the code:

STRUCTS

The program defines two struct types:

  • worker_data with two fields, an integer id and a status string.
  • dispatcher_args with a field dispatcher_pipe which is a pointer to a 2D integer array.

Shared Memory and Semaphores

The program sets up shared memory and semaphores. The shared memory is used to store data (worker_data) about the worker processes that can be accessed by multiple processes. The semaphores are used for synchronization between processes/threads.

Global Variables

Several global variables are declared, including the shared memory variables, semaphore variables, a count of worker processes, and an array to store the PIDs of the worker processes.

Dispatcher Function

The dispatcher is a thread function that loops indefinitely. In each iteration, it checks the status of each worker in the shared memory. If a worker's status is "ready", the dispatcher changes the status to "working", writes a message into the worker's pipe, and then closes the pipe.

Worker Function

Each worker process runs the Worker function. This function uses select() to wait for data to be available on its pipe from the dispatcher. When data is available, it reads the data, performs some work (not specified in the provided code), changes its status in the shared memory to "ready", and then repeats the process indefinitely.

Main Function

The main function performs the following actions:

  1. Allocates shared memory for worker data.
  2. Unlinks and opens a semaphore.
  3. Initializes a dispatcher_args structure and assigns it the address of dispatcher_pipe.
  4. Creates worker processes using a loop and fork(). In each iteration, it sets up a pipe, forks a new process, and assigns the child process to run the Worker function.
  5. Creates a dispatcher thread that runs the dispatcher function.
  6. Waits for the dispatcher thread to finish.
  7. Cleans up by unmapping the shared memory, closing and unlinking the shared memory and semaphore, and freeing allocated memory.

The purpose of this code is to the Dispatcher Threat communicate with the Worker Process, the Dispatcher assigns tasks to Worker processes and waits for them to be ready for a new task. Each worker process indicates its readiness for a new task by changing its status in the shared memory.

that are avaiable to perform a task, using shared memory, pipes, and semaphores.

When I compile the program with gcc updatestatu.c -o updsts -lpthread -lrt, the program does the "first loop", that mean it writes and reads to a Worker Process (for example 1) and it changes correctly the status of that Worker (1), but in the "second loop", the Worker (1) that performed the task before appears to be ready, but in the Worker Process it doesnt gets any message, only when it goes to the next Worker (a different one like Worker 2) it gets the message. I dont know if it is something with the reading of the Worker Process or something with the changes of the status of the Worker Process.

Heres the output I am getting (the "loops" of sending and getting messages are separated by a \n)

Worker 1 Status ready
Worker 1 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 1 Status ready

Worker 1 Status ready
Worker 1 Status working
message away
not read Worker 1 Status working

Worker 2 Status ready
Worker 2 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 2 Status ready
not read Worker 1 Status working

Worker 2 Status ready
Worker 2 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working

Worker 3 Status ready
Worker 3 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 3 Status ready
not read Worker 1 Status working
not read Worker 2 Status working

Worker 3 Status ready
Worker 3 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working

Worker 4 Status ready
Worker 4 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 4 Status ready
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working

Worker 4 Status ready
Worker 4 Status working
message away
^C

Note, I pressed ^C because as I said in the problem in the cycle " for (int i = 0; i < n_workers; i++) " as there is no Worker ready (all 5 workers are supposed to be working) it creates an infinite loop.

Here is the code

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/mman.h>

typedef struct worker_data{
    int id;
    char status[20];
} worker_data;

typedef struct dispatcher_args{
    int** dispatcher_pipe;
} dispatcher_args;

// variables for shared memory
worker_data* workers;
size_t workers_size;

// variables for semaphores
sem_t *sem_log;

int n_workers = 5;
pid_t *pid_workers;

void* dispatcher(void *args) {
    dispatcher_args* arg = args;
    int (*dispatcher_pipe)[2] = (int (*)[2])arg->dispatcher_pipe;

    char msg[100];
    strcpy(msg,"Hello!");
    int j=0;

    printf("THREAD DISPATCHER CREATED\n");

    strcpy(msg,"Hello");

    while(1){

        for (int i = 0; i < n_workers; i++) {
            if (strcmp(workers[i].status, "ready") == 0) {
                printf("\nWorker %d Status %s\n",workers[i].id,workers[i].status);

                // update status of the worker in shared memory
                strcpy(workers[i].status, "working");

                printf("Worker %d Status %s\n",workers[i].id,workers[i].status);
                close(dispatcher_pipe[i][0]); // close the read end of the pipe
                printf("message away\n");
                write(dispatcher_pipe[i][1], &msg, sizeof(msg));
                close(dispatcher_pipe[i][1]); // close the write end of the pipe
                break;
            }
            else {
                printf("not read Worker %d Status %s\n",workers[i].id,workers[i].status);
            }
        }
        //j+=1;
        sleep(2);
    }
}

void Worker(int *worker_pipe, int worker_index) {
    char msg[100];
    sprintf(msg, "WORKER %d READY\n", workers[worker_index].id);

    fd_set set;
    FD_ZERO(&set);
    FD_SET(worker_pipe[0], &set);

    while(1){


        int ready = select(worker_pipe[0]+1, &set, NULL, NULL, NULL);
        if (ready < 0) {
            perror("select");
            exit(1);
        }
        printf("message get 1\n");
        if (FD_ISSET(worker_pipe[0], &set)) {
            printf("message get 2\n");
            // data is available, read from the pipe
            ssize_t nbytes = read(worker_pipe[0], &msg, sizeof(msg));
            if (nbytes < 0) {
                perror("read");
                exit(1);
            }
            printf("Worker message received from Dispatcher: %s\n", msg);
        }
        close(worker_pipe[1]);

        // functions of worker
        // ...

        // change status of the worker in shared memory
        strcpy(workers[worker_index].status, "ready");
        printf("JOB FINISHED Worker %d Status %s\n", workers[worker_index].id, workers[worker_index].status);

        // sleep(3);
        // printf("I am Worker and Im alive\n");
    }
}

int main() {
    pthread_t dispatcher_thread;
    int dispatcher_pipe[n_workers][2];
    workers_size = n_workers * sizeof(worker_data);
    int workers_fd = shm_open("workers", O_CREAT | O_RDWR, 0666);
    ftruncate(workers_fd, workers_size);
    workers = mmap(NULL, workers_size, PROT_READ | PROT_WRITE, MAP_SHARED, workers_fd, 0);

    pid_workers = malloc(n_workers * sizeof(pid_t));

// create semaphores
    sem_unlink("sem_log");
    sem_log = sem_open("sem_log", O_CREAT | O_EXCL, 0700, 1);

    dispatcher_args* disp_args = malloc(sizeof(dispatcher_args));
    disp_args->dispatcher_pipe= (int **) dispatcher_pipe;

// creating worker processes
    for (int i = 0; i < n_workers; i++) {
        worker_data worker;
        worker.id = i+1;
        strcpy(worker.status, "ready");
        workers[i] = worker;

        // create the unnamed pipe for the dispatcher and corresponding association
        if (pipe(dispatcher_pipe[i]) == -1) {
            printf("Error creating dispatcher pipe n1\n");
            exit(0);
        }

        pid_workers[i] = fork();

        if (pid_workers[i] == 0) {
            // worker process
            Worker(dispatcher_pipe[i], i);
            // printf("Hey\n");
        } else if (pid_workers[i] > 0) {
            // parent process
        } else {
            // error forking
            perror("fork");
            exit(EXIT_FAILURE);
        }
    }

// create dispatcher thread
    pthread_create(&dispatcher_thread, NULL, dispatcher, disp_args);

// wait for the threads to finish
    pthread_join(dispatcher_thread, NULL);

// clean up
    munmap(workers, workers_size);
    close(workers_fd);
    shm_unlink("workers");
    sem_close(sem_log);
    sem_unlink("sem_log");
    free(pid_workers);
    free(workers);
    free(disp_args);

    return 0;

}

1 Answers1

3

Don't close the pipes!

  1. In the master process/thread, after sending the first message, you close the sending end of the pipe. So, you can never send another message.

  2. In the worker process, after receiving the first message, you close the receiving end of the pipe. So, you can never receive another message.

  3. I wouldn't use a string for the worker state. I'd use a single enum. The string can't be updated atomically.

  4. The master should not be setting the worker state. Only the worker should do that. Otherwise, you can have a race condition.

  5. You want to use stdatomic.h primitives when the worker sets the state. They do a cache update/flush/sync.

Craig Estey
  • 30,627
  • 4
  • 24
  • 48
  • Done! By the way, I got another bug with the same code but now I changed the variable of the shared memory, should I create another post or answer this question? – Duarte Neves May 10 '23 at 00:19
  • @DuarteNeves Hmm, this is judgement call. But, because you've got an answer here and changed/fixed the code for it, I'd post a _new_ question (with the _updated_ code). You can put a link to _this_ question in your new one. I'd add some text like "This is a followup question to: I got an answer and fixed the initial bug, but now I have a new bug: ... Here is the revised/updated code" That way, you'll be seen as a responsible OP who is making progress. This _isn't_ you, but some OPs just ask the the same question on multiple posts [and never apply any fixes or advice they get]. – Craig Estey May 10 '23 at 00:33
  • @DuarteNeves Because _this_ question _has_ an answer [and, doubly so, because it's accepted], people are [somewhat] less likely to look at it because they assume your question has been answered. – Craig Estey May 10 '23 at 00:38
  • Got it, meanwhile I manage to solve that bug, thanks a lot again! And by the way, I had a feeling I already saw your name somewhere, and you were the guy that help me solve another problem in another question I made ahahaha (its this one https://stackoverflow.com/questions/76159714/problem-with-threads-and-unnamed-pipes-in-c-linux) – Duarte Neves May 10 '23 at 01:05