I've used IPC message queues in real commercial products before.
I didn't have to use signals. I used nanosleep
to wait, so this avoids the messiness of EINTR
.
Below is some skeleton code. It uses only one message queue and several message types (one for main, one for each harbor and one for each ship).
It allows:
the main process to communicate with all ships and harbors.
Any harbor can communicate with other harbors and all ships.
Any ship can communicate with other ships and all harbors.
In the code below, I've shown some some harbor code. The ships code would be similar. Also, the main process code would also be similar.
The code is skeletal/incomplete. But, it shows the basic mechanism.
I created a single struct for (e.g.) harbor data. This is used in both the harbor "current state" array and in the messages.
It might be better to have separate types for harbor current state and the harbor specific data. That is, instead of struct harbor
, we might have struct harbor_state
and struct harbor_msg
.
Anyway, here is the code. It compiles but is not tested. It is annotated:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
struct ship {
long ship_mtyp; // msgtyp
int ship_idx; // index into ships array
pid_t ship_pid; // ship's process id
// ship specific data ...
int ship_cargo;
};
struct harbor {
long harbor_mtyp; // msgtyp
int harbor_idx; // index into harbors array
pid_t harbor_pid; // harbor's process id
// harbor specific data ...
int harbor_nship;
};
struct main {
long main_mtyp; // msgtyp
int main_total;
};
enum type {
TYPE_SHIP,
TYPE_HARBOR,
TYPE_MAIN,
};
struct msg {
long msg_mtyp;
enum type msg_type;
union {
struct ship ship;
struct harbor harbor;
struct main main;
} msg_data;
};
#define NSHIP 20
#define NHARBOR 5
struct harbor harbors[NHARBOR];
struct ship ships[NSHIP];
long mtyp_uniq = 0; // unique msgtyp
long main_mtyp;
struct main main_data;
int msqid; // message queue id
#define MSGLEN (sizeof(struct msg) - sizeof(long))
typedef long long tod_t;
#define TODNS 1000000000
#define INTVNS TODNS // wakeup every second
// todget -- get time of day in nanoseconds
tod_t
todget(void)
{
struct timespec ts;
tod_t tod;
clock_gettime(CLOCK_MONOTONIC,&ts);
tod = ts.tv_sec;
tod *= 100000000;
tod += ts.tv_nsec;
return tod;
}
tod_t todold;
// waitfor -- wait for next interval
void
waitfor(void)
{
tod_t todnow;
tod_t todfire;
tod_t todsleep;
struct timespec ts;
// get current time
todnow = todget();
// initialize previous firing time
if (todold == 0)
todold = todnow;
// get firing time of next batch of outgoing messages
todfire = todold + INTVNS;
// get amount to sleep
todsleep = todfire - todnow;
// sleep a bit
if (todsleep > 0) {
ts.tv_sec = todsleep / TODNS;
ts.tv_nsec = todsleep % TODNS;
nanosleep(&ts,NULL);
todnow = todget();
}
todold = todnow;
}
// harbor_rcv -- receive all current/pending message sent to this harbor
void
harbor_rcv(struct harbor *self)
{
struct msg msg;
int flags = IPC_NOWAIT;
ssize_t len;
struct ship *smsg;
struct ship *ship;
struct harbor *hmsg;
struct harbor *harbor;
struct main *mmsg;
while (1) {
len = msgrcv(msqid,&msg,MSGLEN,self->harbor_mtyp,flags);
// nothing to process
if (len == 0)
break;
// error
if (len < 0) {
// no message
if (errno == EAGAIN)
break;
perror("msgrcv");
exit(1);
}
// process the message
switch (msg.msg_type) {
case TYPE_SHIP:
smsg = &msg.msg_data.ship;
ship = &ships[smsg->ship_idx];
*ship = *smsg;
break;
case TYPE_HARBOR:
hmsg = &msg.msg_data.harbor;
harbor = &harbors[hmsg->harbor_idx];
*harbor = *hmsg;
break;
case TYPE_MAIN:
mmsg = &msg.msg_data.main;
main_data = *mmsg;
break;
}
}
}
// harbor_process -- perform all actions for given harbor
void
harbor_process(struct harbor *self)
{
struct msg msg;
int flags = 0;
while (1) {
// wait until we should send messages
waitfor();
// receive messages
harbor_rcv(self);
// send our harbor data to all _other_ harbors
for (int harboridx = 0; harboridx < NHARBOR; ++harboridx) {
struct harbor *harbor = &harbors[harboridx];
if (harbor == self)
continue;
// process incoming messages
harbor_rcv(self);
msg.msg_data.harbor = *self;
msg.msg_mtyp = harbor->harbor_mtyp;
msgsnd(msqid,&msg,MSGLEN,flags);
}
// send our harbor data to all ships
for (int shipidx = 0; shipidx < NSHIP; ++shipidx) {
struct ship *ship = &ships[shipidx];
// process incoming messages
harbor_rcv(self);
msg.msg_data.harbor = *self;
msg.msg_mtyp = ship->ship_mtyp;
msgsnd(msqid,&msg,MSGLEN,flags);
}
// send our harbor data to main
msg.msg_data.harbor = *self;
msg.msg_mtyp = main_mtyp;
msgsnd(msqid,&msg,MSGLEN,flags);
// process incoming messages
harbor_rcv(self);
}
exit(0);
}
// ship_process -- perform all actions for given ship
void
ship_process(struct ship *self)
{
// TODO -- similar to harbor_process
}
int
main(void)
{
pid_t pid;
msqid = msgget(IPC_PRIVATE,IPC_CREAT);
if (msqid < 0) {
perror("msgget");
exit(1);
}
main_mtyp = ++mtyp_uniq;
// initialize all ships
for (int shipidx = 0; shipidx < NSHIP; ++shipidx) {
struct ship *ship = &ships[shipidx];
ship->ship_mtyp = ++mtyp_uniq;
ship->ship_idx = shipidx;
}
// initialize all harbors
for (int harboridx = 0; harboridx < NHARBOR; ++harboridx) {
struct harbor *harbor = &harbors[harboridx];
harbor->harbor_mtyp = ++mtyp_uniq;
harbor->harbor_idx = harboridx;
}
// launch all ships
for (int shipidx = 0; shipidx < NSHIP; ++shipidx) {
struct ship *ship = &ships[shipidx];
pid = fork();
if (pid == 0)
ship_process(ship);
else
ship->ship_pid = pid;
}
// launch all harbors
for (int harboridx = 0; harboridx < NHARBOR; ++harboridx) {
struct harbor *harbor = &harbors[harboridx];
pid = fork();
if (pid == 0)
harbor_process(harbor);
else
harbor->harbor_pid = pid;
}
// process incoming messages, etc.
while (1) {
}
return 0;
}
UPDATE:
but each ship has to be each to communicate with each harbor so I could end up with thousands of pipes, but how can I interrupted the execution after a certain time has passed in each "ship" and "harbor" process? –
Vasetto
Unfortunately, having "thousands of pipes" implies having thousands of processes. This won't scale. With any number of threads/processes that exceeds a certain amount (e.g. 16 or so), the system will spend most of its time context switching between them.
So, a different model is the "worker thread" model. Define N
threads that process messages. Here N
is relatively small (e.g 16). Each pulls from the same msgtyp
, does processing, and loops.
You might be better off with threads rather than processes. Or, some processes that have a certain number of threads.
All this depends more on what data the ships have, what data the harbors have, and how they interact. So, we'd need to have the details on the actual data, what action triggers messages from ships to ships/harbors and/or messages from harbors to ships/harbors.