First thing, thanks for reading and trying to help!
This is a multi-process and multi-threaded code in C. It involves creating multiple worker processes and a dispatcher thread that communicates with the workers through unnamed pipes.
Below is a explanation of the code:
STRUCTS
The program defines two struct types:
worker_data
with two fields, an integerid
and a status string.dispatcher_args
with a fielddispatcher_pipe
which is a pointer to a 2D integer array.
Shared Memory and Semaphores
The program sets up shared memory and semaphores. The shared memory is used to store data (worker_data
) about the worker processes that can be accessed by multiple processes. The semaphores are used for synchronization between processes/threads.
Global Variables
Several global variables are declared, including the shared memory variables, semaphore variables, a count of worker processes, and an array to store the PIDs of the worker processes.
Dispatcher Function
The dispatcher is a thread function that loops indefinitely. In each iteration, it checks the status of each worker in the shared memory. If a worker's status is "ready", the dispatcher changes the status to "working", writes a message into the worker's pipe, and then closes the pipe.
Worker Function
Each worker process runs the Worker function. This function uses select()
to wait for data to be available on its pipe from the dispatcher. When data is available, it reads the data, performs some work (not specified in the provided code), changes its status in the shared memory to "ready", and then repeats the process indefinitely.
Main Function
The main function performs the following actions:
- Allocates shared memory for worker data.
- Unlinks and opens a semaphore.
- Initializes a
dispatcher_args
structure and assigns it the address ofdispatcher_pipe
. - Creates worker processes using a loop and fork(). In each iteration, it sets up a pipe, forks a new process, and assigns the child process to run the Worker function.
- Creates a dispatcher thread that runs the
dispatcher
function. - Waits for the dispatcher thread to finish.
- Cleans up by unmapping the shared memory, closing and unlinking the shared memory and semaphore, and freeing allocated memory.
The purpose of this code is to the Dispatcher Threat communicate with the Worker Process, the Dispatcher assigns tasks to Worker processes and waits for them to be ready for a new task. Each worker process indicates its readiness for a new task by changing its status in the shared memory.
that are avaiable to perform a task, using shared memory, pipes, and semaphores.
When I compile the program with gcc updatestatu.c -o updsts -lpthread -lrt, the program does the "first loop", that mean it writes and reads to a Worker Process (for example 1) and it changes correctly the status of that Worker (1), but in the "second loop", the Worker (1) that performed the task before appears to be ready, but in the Worker Process it doesnt gets any message, only when it goes to the next Worker (a different one like Worker 2) it gets the message. I dont know if it is something with the reading of the Worker Process or something with the changes of the status of the Worker Process.
Heres the output I am getting (the "loops" of sending and getting messages are separated by a \n)
Worker 1 Status ready
Worker 1 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 1 Status ready
Worker 1 Status ready
Worker 1 Status working
message away
not read Worker 1 Status working
Worker 2 Status ready
Worker 2 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 2 Status ready
not read Worker 1 Status working
Worker 2 Status ready
Worker 2 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working
Worker 3 Status ready
Worker 3 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 3 Status ready
not read Worker 1 Status working
not read Worker 2 Status working
Worker 3 Status ready
Worker 3 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working
Worker 4 Status ready
Worker 4 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 4 Status ready
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working
Worker 4 Status ready
Worker 4 Status working
message away
^C
Note, I pressed ^C because as I said in the problem in the cycle " for (int i = 0; i < n_workers; i++) " as there is no Worker ready (all 5 workers are supposed to be working) it creates an infinite loop.
Here is the code
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/mman.h>
typedef struct worker_data{
int id;
char status[20];
} worker_data;
typedef struct dispatcher_args{
int** dispatcher_pipe;
} dispatcher_args;
// variables for shared memory
worker_data* workers;
size_t workers_size;
// variables for semaphores
sem_t *sem_log;
int n_workers = 5;
pid_t *pid_workers;
void* dispatcher(void *args) {
dispatcher_args* arg = args;
int (*dispatcher_pipe)[2] = (int (*)[2])arg->dispatcher_pipe;
char msg[100];
strcpy(msg,"Hello!");
int j=0;
printf("THREAD DISPATCHER CREATED\n");
strcpy(msg,"Hello");
while(1){
for (int i = 0; i < n_workers; i++) {
if (strcmp(workers[i].status, "ready") == 0) {
printf("\nWorker %d Status %s\n",workers[i].id,workers[i].status);
// update status of the worker in shared memory
strcpy(workers[i].status, "working");
printf("Worker %d Status %s\n",workers[i].id,workers[i].status);
close(dispatcher_pipe[i][0]); // close the read end of the pipe
printf("message away\n");
write(dispatcher_pipe[i][1], &msg, sizeof(msg));
close(dispatcher_pipe[i][1]); // close the write end of the pipe
break;
}
else {
printf("not read Worker %d Status %s\n",workers[i].id,workers[i].status);
}
}
//j+=1;
sleep(2);
}
}
void Worker(int *worker_pipe, int worker_index) {
char msg[100];
sprintf(msg, "WORKER %d READY\n", workers[worker_index].id);
fd_set set;
FD_ZERO(&set);
FD_SET(worker_pipe[0], &set);
while(1){
int ready = select(worker_pipe[0]+1, &set, NULL, NULL, NULL);
if (ready < 0) {
perror("select");
exit(1);
}
printf("message get 1\n");
if (FD_ISSET(worker_pipe[0], &set)) {
printf("message get 2\n");
// data is available, read from the pipe
ssize_t nbytes = read(worker_pipe[0], &msg, sizeof(msg));
if (nbytes < 0) {
perror("read");
exit(1);
}
printf("Worker message received from Dispatcher: %s\n", msg);
}
close(worker_pipe[1]);
// functions of worker
// ...
// change status of the worker in shared memory
strcpy(workers[worker_index].status, "ready");
printf("JOB FINISHED Worker %d Status %s\n", workers[worker_index].id, workers[worker_index].status);
// sleep(3);
// printf("I am Worker and Im alive\n");
}
}
int main() {
pthread_t dispatcher_thread;
int dispatcher_pipe[n_workers][2];
workers_size = n_workers * sizeof(worker_data);
int workers_fd = shm_open("workers", O_CREAT | O_RDWR, 0666);
ftruncate(workers_fd, workers_size);
workers = mmap(NULL, workers_size, PROT_READ | PROT_WRITE, MAP_SHARED, workers_fd, 0);
pid_workers = malloc(n_workers * sizeof(pid_t));
// create semaphores
sem_unlink("sem_log");
sem_log = sem_open("sem_log", O_CREAT | O_EXCL, 0700, 1);
dispatcher_args* disp_args = malloc(sizeof(dispatcher_args));
disp_args->dispatcher_pipe= (int **) dispatcher_pipe;
// creating worker processes
for (int i = 0; i < n_workers; i++) {
worker_data worker;
worker.id = i+1;
strcpy(worker.status, "ready");
workers[i] = worker;
// create the unnamed pipe for the dispatcher and corresponding association
if (pipe(dispatcher_pipe[i]) == -1) {
printf("Error creating dispatcher pipe n1\n");
exit(0);
}
pid_workers[i] = fork();
if (pid_workers[i] == 0) {
// worker process
Worker(dispatcher_pipe[i], i);
// printf("Hey\n");
} else if (pid_workers[i] > 0) {
// parent process
} else {
// error forking
perror("fork");
exit(EXIT_FAILURE);
}
}
// create dispatcher thread
pthread_create(&dispatcher_thread, NULL, dispatcher, disp_args);
// wait for the threads to finish
pthread_join(dispatcher_thread, NULL);
// clean up
munmap(workers, workers_size);
close(workers_fd);
shm_unlink("workers");
sem_close(sem_log);
sem_unlink("sem_log");
free(pid_workers);
free(workers);
free(disp_args);
return 0;
}