0

I need to create a project where a main program creates a given number of child processes and each child launches exec, there are 2 type of child processes and they comunicate using message queues. Every second all children have to send their status and situation (if you want more info about what info is needed le me know) to the main program, my question is: how can all children send the info to the main program without getting "interrupted system call" because the child was waiting/writing a message on the message queue?

char *receiveMessage(int msgQId, long tipo) {
    my_msg_buf msgBuf;
    sigset_t mask;

    sigemptyset(&mask);
    sigaddset(&mask, SIGUSR1);


    sigprocmask(SIG_BLOCK, &mask, NULL );
    while(1)
    if (msgrcv(msgQId, (void *)&msgBuf, MAX_LINE_LENGTH, tipo, IPC_NOWAIT) == -1) {
        if(errno != ENOMSG && errno != EINTR){
            fprintf(stderr, "Ricevuto errore: %s\n", strerror(errno));
            return NULL;
        }
        if(errno == ENOMSG)
            return NULL;
    }else break;

    sigprocmask(SIG_UNBLOCK, &mask, NULL );

    return strdup(msgBuf.testo);
}

I tried using a loop waiting for the message and keep looping if errno is equal to EINTR, but it doesn't seem to be working and children never seem to end. I was also thinking about setting an internal timer inside each child but I don't know how to handle the message queues and where to check the time during execution.

Vasetto
  • 3
  • 2
  • Are you installing signal handlers? If not, how are you getting EINTR? See https://stackoverflow.com/questions/4959524/when-to-check-for-eintr-and-repeat-the-function-call - you probably should not be getting EINTR at all. Post your message loop code perhaps, rather than saying it "acts weird." – John Zwinck Jan 08 '23 at 17:33
  • @JohnZwinck Thank you for your answer, I edited the question, I am also wondering if signals are the best way to handle this kind of situation or is there a better way – Vasetto Jan 08 '23 at 17:45
  • How about replacing your message queues and signals with a simple `pipe()` (https://man7.org/linux/man-pages/man2/pipe.2.html)? – John Zwinck Jan 08 '23 at 17:50
  • @JohnZwinck The projects has a random number of ships going to random harbors on a map, I don't know where each ship will go ahead of time, so every harbor must be able to communicate with each ship, I did it using a common message queue among all processes, the ships write in it setting the message type equal to the PID of the harbor it is going to. My problem is how to stop writing a message or waiting for one after the timer expires. I'm sorry if it's convoluted, it's an exam project. – Vasetto Jan 08 '23 at 18:04
  • OK, so you have one "main" process, several "harbor" processes, and several "ship" processes. You can create one `pipe()` in the main process every time before it forks a child process. Meaning each harbor process and each ship process will have one writeable pipe it can use to send messages, and the main process will read from all the pipes (using select() or poll()). Note that the SysV message queue you're using now does not support select() or poll() on Linux, which is a significant limitation. – John Zwinck Jan 08 '23 at 18:08
  • @JohnZwinck I'm sorry for still bugging you, but each ship has to be each to communicate with each harbor so I could end up with thousands of pipes, but how can I interrupted the execution after a certain time has passed in each "ship" and "harbor" process? – Vasetto Jan 08 '23 at 18:30
  • Then make another pipe per harbor process in the reverse direction, to let the main process send data to each harbor. That way you do not need every ship to talk to every harbor, you only need ships to talk to the main process, and the main process can tell the harbors what they need to know. As for making each ship and harbor process know when some time has elapsed, that's easy: poll() and select() have a timeout argument, so just set that and each process will be woken up after that much time even if no messages are received. – John Zwinck Jan 08 '23 at 20:09
  • @JohnZwinck Thank you, I will trying using poll(), but the only processes that can I keep waiting for messages are the ships as the harbors need to be able to communicate with other ships if they send a request. – Vasetto Jan 09 '23 at 16:17

1 Answers1

0

I've used IPC message queues in real commercial products before.

I didn't have to use signals. I used nanosleep to wait, so this avoids the messiness of EINTR.

Below is some skeleton code. It uses only one message queue and several message types (one for main, one for each harbor and one for each ship).

It allows:

  1. the main process to communicate with all ships and harbors.

  2. Any harbor can communicate with other harbors and all ships.

  3. Any ship can communicate with other ships and all harbors.


In the code below, I've shown some some harbor code. The ships code would be similar. Also, the main process code would also be similar.

The code is skeletal/incomplete. But, it shows the basic mechanism.

I created a single struct for (e.g.) harbor data. This is used in both the harbor "current state" array and in the messages.

It might be better to have separate types for harbor current state and the harbor specific data. That is, instead of struct harbor, we might have struct harbor_state and struct harbor_msg.

Anyway, here is the code. It compiles but is not tested. It is annotated:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

struct ship {
    long ship_mtyp;                         // msgtyp
    int ship_idx;                           // index into ships array
    pid_t ship_pid;                         // ship's process id

    // ship specific data ...
    int ship_cargo;
};

struct harbor {
    long harbor_mtyp;                       // msgtyp
    int harbor_idx;                         // index into harbors array
    pid_t harbor_pid;                       // harbor's process id

    // harbor specific data ...
    int harbor_nship;
};

struct main {
    long main_mtyp;                         // msgtyp
    int main_total;
};

enum type {
    TYPE_SHIP,
    TYPE_HARBOR,
    TYPE_MAIN,
};

struct msg {
    long msg_mtyp;
    enum type msg_type;
    union {
        struct ship ship;
        struct harbor harbor;
        struct main main;
    } msg_data;
};

#define NSHIP       20
#define NHARBOR     5

struct harbor harbors[NHARBOR];
struct ship ships[NSHIP];

long mtyp_uniq = 0;                     // unique msgtyp
long main_mtyp;
struct main main_data;

int msqid;                              // message queue id

#define MSGLEN  (sizeof(struct msg) - sizeof(long))

typedef long long tod_t;
#define TODNS       1000000000
#define INTVNS      TODNS               // wakeup every second

// todget -- get time of day in nanoseconds
tod_t
todget(void)
{
    struct timespec ts;
    tod_t tod;

    clock_gettime(CLOCK_MONOTONIC,&ts);

    tod = ts.tv_sec;
    tod *= 100000000;
    tod += ts.tv_nsec;

    return tod;
}

tod_t todold;

// waitfor -- wait for next interval
void
waitfor(void)
{
    tod_t todnow;
    tod_t todfire;
    tod_t todsleep;
    struct timespec ts;

    // get current time
    todnow = todget();

    // initialize previous firing time
    if (todold == 0)
        todold = todnow;

    // get firing time of next batch of outgoing messages
    todfire = todold + INTVNS;

    // get amount to sleep
    todsleep = todfire - todnow;

    // sleep a bit
    if (todsleep > 0) {
        ts.tv_sec = todsleep / TODNS;
        ts.tv_nsec = todsleep % TODNS;
        nanosleep(&ts,NULL);

        todnow = todget();
    }

    todold = todnow;
}

// harbor_rcv -- receive all current/pending message sent to this harbor
void
harbor_rcv(struct harbor *self)
{
    struct msg msg;
    int flags = IPC_NOWAIT;
    ssize_t len;
    struct ship *smsg;
    struct ship *ship;
    struct harbor *hmsg;
    struct harbor *harbor;
    struct main *mmsg;

    while (1) {
        len = msgrcv(msqid,&msg,MSGLEN,self->harbor_mtyp,flags);

        // nothing to process
        if (len == 0)
            break;

        // error
        if (len < 0) {
            // no message
            if (errno == EAGAIN)
                break;

            perror("msgrcv");
            exit(1);
        }

        // process the message
        switch (msg.msg_type) {
        case TYPE_SHIP:
            smsg = &msg.msg_data.ship;
            ship = &ships[smsg->ship_idx];
            *ship = *smsg;
            break;

        case TYPE_HARBOR:
            hmsg = &msg.msg_data.harbor;
            harbor = &harbors[hmsg->harbor_idx];
            *harbor = *hmsg;
            break;

        case TYPE_MAIN:
            mmsg = &msg.msg_data.main;
            main_data = *mmsg;
            break;
        }
    }
}

// harbor_process -- perform all actions for given harbor
void
harbor_process(struct harbor *self)
{
    struct msg msg;
    int flags = 0;

    while (1) {
        // wait until we should send messages
        waitfor();

        // receive messages
        harbor_rcv(self);

        // send our harbor data to all _other_ harbors
        for (int harboridx = 0;  harboridx < NHARBOR;  ++harboridx) {
            struct harbor *harbor = &harbors[harboridx];
            if (harbor == self)
                continue;

            // process incoming messages
            harbor_rcv(self);

            msg.msg_data.harbor = *self;
            msg.msg_mtyp = harbor->harbor_mtyp;
            msgsnd(msqid,&msg,MSGLEN,flags);
        }

        // send our harbor data to all ships
        for (int shipidx = 0;  shipidx < NSHIP;  ++shipidx) {
            struct ship *ship = &ships[shipidx];

            // process incoming messages
            harbor_rcv(self);

            msg.msg_data.harbor = *self;
            msg.msg_mtyp = ship->ship_mtyp;
            msgsnd(msqid,&msg,MSGLEN,flags);
        }

        // send our harbor data to main
        msg.msg_data.harbor = *self;
        msg.msg_mtyp = main_mtyp;
        msgsnd(msqid,&msg,MSGLEN,flags);

        // process incoming messages
        harbor_rcv(self);
    }

    exit(0);
}

// ship_process -- perform all actions for given ship
void
ship_process(struct ship *self)
{

    // TODO -- similar to harbor_process
}

int
main(void)
{
    pid_t pid;

    msqid = msgget(IPC_PRIVATE,IPC_CREAT);
    if (msqid < 0) {
        perror("msgget");
        exit(1);
    }

    main_mtyp = ++mtyp_uniq;

    // initialize all ships
    for (int shipidx = 0;  shipidx < NSHIP;  ++shipidx) {
        struct ship *ship = &ships[shipidx];
        ship->ship_mtyp = ++mtyp_uniq;
        ship->ship_idx = shipidx;
    }

    // initialize all harbors
    for (int harboridx = 0;  harboridx < NHARBOR;  ++harboridx) {
        struct harbor *harbor = &harbors[harboridx];
        harbor->harbor_mtyp = ++mtyp_uniq;
        harbor->harbor_idx = harboridx;
    }

    // launch all ships
    for (int shipidx = 0;  shipidx < NSHIP;  ++shipidx) {
        struct ship *ship = &ships[shipidx];
        pid = fork();
        if (pid == 0)
            ship_process(ship);
        else
            ship->ship_pid = pid;
    }

    // launch all harbors
    for (int harboridx = 0;  harboridx < NHARBOR;  ++harboridx) {
        struct harbor *harbor = &harbors[harboridx];
        pid = fork();
        if (pid == 0)
            harbor_process(harbor);
        else
            harbor->harbor_pid = pid;
    }

    // process incoming messages, etc.
    while (1) {
    }

    return 0;
}

UPDATE:

but each ship has to be each to communicate with each harbor so I could end up with thousands of pipes, but how can I interrupted the execution after a certain time has passed in each "ship" and "harbor" process? –  Vasetto

Unfortunately, having "thousands of pipes" implies having thousands of processes. This won't scale. With any number of threads/processes that exceeds a certain amount (e.g. 16 or so), the system will spend most of its time context switching between them.

So, a different model is the "worker thread" model. Define N threads that process messages. Here N is relatively small (e.g 16). Each pulls from the same msgtyp, does processing, and loops.

You might be better off with threads rather than processes. Or, some processes that have a certain number of threads.

All this depends more on what data the ships have, what data the harbors have, and how they interact. So, we'd need to have the details on the actual data, what action triggers messages from ships to ships/harbors and/or messages from harbors to ships/harbors.

Craig Estey
  • 30,627
  • 4
  • 24
  • 48
  • Thank you for the great answer, my professor never talked about threads so I don't have experience using them, in the project he said he wants the ship processes(up to 1k) to communicate with the harbor processes about giving and retrieving items, in the code I have done up until now it starts getting real slow and moving the wrong number of items once I start having too many ships where each has to send information to the others. Can I ask you about threads? Would it be better to have fewer processes where each one has a thread for every ship? Can it cause problems performace wise? Thank you – Vasetto Jan 09 '23 at 16:15