3

Below is the code for an assignment on processor farming. The focus is on the comments with "HERE $resp is always the same/different". That's my problem: when the worker process does it's job and sends the response data to the farmer, the farmer always receives the same response data (the same pointer address), even though worker sends different data every time.

Example: workers send data at addresses: 0x7fff42318a90,0x7ffddba97390,0x7ffc69e8e060 etc. and farmer keeps receiving data from only one address 0x7ffdb1496f30

I've done my best to abstract the code and question as much as possible. If I've omitted important information please let me know, I'm new to process management programming and I could use some guidance.

UPDATE: also printing the contents of resp s.a resp.b where b is an integer returns the same value, even though the value is different in worker.

UPDATE: I tried writing some runnable code only this time the worker might not be receiving.

//both in farmer and in worker

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>         // for execlp
#include <mqueue.h>         // for mq

typedef struct{

    int a;

} REQUEST;

typedef struct{

    int b;

} RESPONSE;

static char mq_farmer[80];
static char mq_worker[80];

//farmer:

int main (int argc, char * argv[])
{

    REQUEST req;
    RESPONSE resp;

    sprintf (mq_farmer, "/mq_request_%s_%d", "foo", getpid());
    sprintf (mq_worker, "/mq_response_%s_%d", "bar", getpid());

    //define attr
    struct mq_attr attr;

    attr.mq_maxmsg= 10;

    attr.mq_msgsize = sizeof(REQUEST);
    mqd_t reqQueue = mq_open(mq_farmer, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);

    attr.mq_msgsize = sizeof(RESPONSE);
    mqd_t respQueue = mq_open(mq_worker, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);

    //  * create the child processes (see process_test() and message_queue_test())
    int i;
    for(i = 0; i < 3; i++)
        {
            pid_t processID = fork();
            if(processID < 0)
                {
                    //error
                }

            else if(processID == 0)
                {
                    //some code

                    execlp("./worker","worker", getpid(), i, NULL);
                }
        }

    pid_t pid = fork();


    if(pid < 0)
        {
            //error
        }
    else
        {
            if(pid == 0) //receiving done here
                {
                    for(i = 0; i < 3; i++)
                        {

                            // read the messages from the worker queue
                            mqd_t received = mq_receive (respQueue, (char *) &resp, sizeof(resp), NULL);
                            printf("Farmer received worker response: %p\n with value %d\n", &resp, resp.b);
                            //HERE &resp is always the same


                        }

                    // end worker process
                    req.a = -1;
                    mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);

                }
            else //sending done here
                {
                    for(i = 0; i < 3; i++)
                        {
                            req.a = i;
                            mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);

                        }
                }


        }

    waitpid(pid, NULL, 0);
    mq_close(reqQueue);
    mq_close(respQueue);


    //clean up the message queues
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);

    return 0;
}

//worker:

int main (int argc, char * argv[])
{

    REQUEST req;
    RESPONSE resp;

    int arg1;

    sscanf(argv[1], "%d", &arg1);

    sprintf (mq_farmer, "/mq_request_%s_%d", "foo", arg1);
    sprintf (mq_worker, "/mq_response_%s_%d", "bar",arg1);

    mqd_t reqQueue = mq_open (mq_farmer, O_RDONLY);

    mqd_t respQueue = mq_open (mq_worker, O_WRONLY);

    while (true){

        //receiving
        mqd_t received = mq_receive (reqQueue, (char *) &req,
                                     sizeof(req), NULL);

        printf("Worker received %p with value %d\n", &req, req.a);

        //received stop signal
        if(req.a < 0){
            printf("stopping worker\n");
            break;
        }

        //waiting for farmer to fork 
        sleep(3);

        //do something with request data
        resp.b = req.a;

        //send response
        mqd_t sent = mq_send (respQueue, (char *) &resp,

                              sizeof (resp), NULL);

        printf("Worker sent response: %p\n", &resp);
        //HERE &resp is always different (doesn't print)
    }

    mq_close(reqQueue);
    mq_close(respQueue);


    //clean up the message queues
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);


    return 0;
}
Barmar
  • 741,623
  • 53
  • 500
  • 612
wraithie
  • 343
  • 1
  • 6
  • 19
  • Where is the code that prints `resp.a`? Where is the declaration of `resp`? – Barmar Sep 23 '16 at 19:55
  • @Barmar above is my attempt at a runnable code only worker doesn't receive I think, at least it doesn't print the "Worker received" message. – wraithie Sep 23 '16 at 22:44
  • there needs to be a separate `attr` structure for each message que, The posted code is using the same `attr` for both message queues. – user3629249 Sep 24 '16 at 23:23
  • 1
    when calling `execlp()` (which can fail) The code needs to allow for a possible failure by following the call with a (hopefully never executed) error message (usually using `perror()` ) and a call to `exit()`. – user3629249 Sep 24 '16 at 23:27
  • 1
    the `main()` will not cleanly compile because the parameter `argc` is not used. Never access beyond `argv[0]` without first checking `argc` to assure the other expected parameters exist. – user3629249 Sep 24 '16 at 23:29
  • EACH 'worker' process must have a separate message queue. Otherwise, all messages will go to the 'worker' process that first latches on to the message queue. – user3629249 Sep 24 '16 at 23:32
  • When calling any of the `scanf()` family of functions, (including `sscanf()` ) always check the returned value (not the parameter value) to assure the operation was successful. – user3629249 Sep 24 '16 at 23:35
  • when calling `execlp()` each of the parameters needs to be a NUL terminated string. The call to `getpid()` returns an int, not a NUL terminated string. The variable `i` is an int, not a NUL terminated string – user3629249 Sep 24 '16 at 23:39

1 Answers1

2

When you call mq_receive it places the data at the buffer pointed to by the second argument, which you give as &resp. It does not change the pointer itself.

&resp is a fixed address in the parent, unless you change it, which appears unlikely from the posted code [which does not show the definition of resp], so:

printf("Received worker response: %p\n", &resp);

You will always get the same value.

What you [probably] want to do is print what resp contains


UPDATE:

Okay, there were a few more bugs.

The big bug is that while you can have one queue for worker-to-farmer messages (i.e. the response queue), you can not use a single queue for requests to workers. They each need their own request queue.

Otherwise, a single worker can absorb/monopolize all requests, even ones that belong to others. If that happened, the farmer would likely see messages that were stamped from only that worker.

This is what you're seeing, because, the first worker [probably #0] has its mq_receive complete first. It is, then, so fast that it does all of the mq_receive/mq_send before any others can get to them.

It will then see a "stop" message and exit. If the others are "lucky", the first worker left the remaining stop messages in the queue. But, no request messages, so they never send a response.

Also, the response queue was opened by the farmer with O_WRONLY instead of O_RDONLY.

I've produced two versions of your program. One with annotations for bugs. Another that is cleaned up and working.


Here's the annotated version [please pardon the gratuitous style cleanup]:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>                     // for execlp
#include <mqueue.h>                     // for mq

typedef struct {
    int a;
} REQUEST;

typedef struct {
    int b;
} RESPONSE;

char *pgmname;

static char mq_farmer[80];
static char mq_worker[80];

int
main(int argc,char **argv)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    pgmname = argv[0];

    --argc;
    ++argv;

    sprintf(mq_farmer,"/mq_request_%s_%d","foo",getpid());
    sprintf(mq_worker,"/mq_response_%s_%d","bar",getpid());

    // define attr
    // NOTE/BUG: this can have random data in it
    struct mq_attr attr;

    attr.mq_maxmsg = 10;

    // NOTE/BUG: this is _the_ big one -- we're only doing a single request
    // queue -- each worker needs its _own_ request queue -- otherwise, a
    // single worker can _monopolize_ all messages for the other workers
    attr.mq_msgsize = sizeof(REQUEST);
    mqd_t reqQueue = mq_open(mq_farmer,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);

    // NOTE/BUG: this should be opened for reading
    attr.mq_msgsize = sizeof(RESPONSE);
    mqd_t respQueue = mq_open(mq_worker,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);

    // create the child processes (see process_test() and message_queue_test())
    int i;

    // NOTE/BUG: we must remember the child pid numbers so we can do waitpid
    // later
    for (i = 0; i < 3; i++) {
        pid_t processID = fork();

        if (processID < 0) {
            // error
        }

        else if (processID == 0) {
            // some code

            // NOTE/BUG: exec* takes strings so this is wrong
            execlp("./worker","worker",getpid(),i,NULL);
        }
    }

    // NOTE/BUG: on all mq_send/mq_receive, the return type is ssize_t and
    // _not_ mqd_t

    pid_t pid = fork();

    if (pid < 0) {
        // error
    }
    else {
        // receiving done here
        if (pid == 0) {
            for (i = 0; i < 3; i++) {

                // read the messages from the worker queue
                ssize_t received = mq_receive(respQueue,(char *) &resp,
                    sizeof(resp),NULL);

                printf("Farmer received worker response: %p with length %ld value %d\n",
                    &resp,received,resp.b);
                // HERE &resp is always the same
            }

            // end worker process
            req.a = -1;
            sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
            printf("Farmer sent stop -- sent=%ld\n",sent);

            // NOTE/BUG: we need to exit here
        }

        // sending done here
        else {
            for (i = 0; i < 3; i++) {
                req.a = i;
                sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
                printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
            }
        }

    }

    // NOTE/BUG: we're waiting on the double fork farmer, but _not_
    // on the actual worker pids
    waitpid(pid,NULL,0);

    mq_close(reqQueue);
    mq_close(respQueue);

    // clean up the message queues
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);

    return 0;
}

int
worker_main(int argc,char *argv[])
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    int arg1;

    // NOTE/BUG: use getppid instead
    sscanf(argv[1],"%d",&arg1);
    printf("worker: my index is %d ...\n",arg1);

    sprintf(mq_farmer,"/mq_request_%s_%d","foo",arg1);
    sprintf(mq_worker,"/mq_response_%s_%d","bar",arg1);

    mqd_t reqQueue = mq_open(mq_farmer,O_RDONLY);

    mqd_t respQueue = mq_open(mq_worker,O_WRONLY);

    while (1) {
        // receiving
        ssize_t received = mq_receive(reqQueue,(char *) &req,
            sizeof(req),NULL);

        printf("Worker received %p with length %ld value %d\n",
            &req,received,req.a);

        // received stop signal
        if (req.a < 0) {
            printf("stopping worker\n");
            break;
        }

        // waiting for farmer to fork
        sleep(3);

        // do something with request data
        resp.b = req.a;

        // send response
        // NOTE/BUG: last argument is unsigned int and _not_ pointer
#if 0
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),NULL);
#else
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);
#endif

        printf("Worker sent response %p with length %ld value %d\n",
            &req,sent,req.a);
        // HERE &resp is always different (doesn't print)
    }

    mq_close(reqQueue);
    mq_close(respQueue);

    // clean up the message queues
    // NOTE/BUG: farmer should do this -- not worker
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);

    return 0;
}

Here's the cleaned up and working version. Note that, for ease/simplicity, I combined both the farmer and worker programs into a single one, using a little bit of trickery in main:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>                     // for execlp
#include <mqueue.h>                     // for mq

typedef struct {
    int a;
} REQUEST;

typedef struct {
    int b;
} RESPONSE;

char *pgmname;
int opt_x;
int opt_W;

#define WORKNR      3

char mqfile_to_farmer[80];
char mqfile_to_worker[80];

struct mq_attr attr;

pid_t ppid;

// per-worker control
struct worker {
    pid_t wk_pid;
    mqd_t wk_req;
    char wk_mqfile[80];
};

struct worker worklist[WORKNR];

void
worker(void)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    ppid = getppid();

    printf("worker: my index is %d ...\n",opt_W);

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
    sprintf(mqfile_to_worker,"/mq_request_%d_%d",ppid,opt_W);

    mqd_t reqQueue = mq_open(mqfile_to_worker,O_RDONLY);
    mqd_t respQueue = mq_open(mqfile_to_farmer,O_WRONLY);

    while (1) {
        // receiving
        errno = 0;
        ssize_t received = mq_receive(reqQueue,(char *) &req,
            sizeof(req),NULL);

        printf("Worker %d received %p with length %ld value %d -- %s\n",
            opt_W,&req,received,req.a,strerror(errno));
        if (received < 0)
            exit(77);

        // received stop signal
        if (req.a < 0) {
            printf("stopping worker\n");
            break;
        }

        // do something with request data
        resp.b = req.a;

        // send response
        errno = 0;
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);

        printf("Worker %d sent response %p with length %ld value %d -- %s\n",
            opt_W,&req,sent,req.a,strerror(errno));
        // HERE &resp is always different (doesn't print)
        if (sent < 0)
            exit(78);
    }

    mq_close(reqQueue);
    mq_close(respQueue);

    exit(0);
}

void
farmer(void)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;
    struct worker *wk;

    ppid = getpid();

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);

    attr.mq_maxmsg = 10;

    attr.mq_msgsize = sizeof(REQUEST);
    mqd_t respQueue = mq_open(mqfile_to_farmer,
        O_RDONLY | O_CREAT | O_EXCL,0600,&attr);
    if (respQueue < 0) {
        printf("farmer: respQueue open fault -- %s\n",strerror(errno));
        exit(1);
    }

    // create the child processes (see process_test() and message_queue_test())
    int i;

    // create the separate request queues
    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];
        attr.mq_msgsize = sizeof(RESPONSE);
        sprintf(wk->wk_mqfile,"/mq_request_%d_%d",ppid,i);
        wk->wk_req = mq_open(wk->wk_mqfile,O_WRONLY | O_CREAT | O_EXCL,0600,
            &attr);
        if (wk->wk_req < 0) {
            printf("farmer: wk_req open fault -- %s\n",strerror(errno));
            exit(1);
        }
    }

    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];

        pid_t pid = fork();

        if (pid < 0) {
            perror("fork");
            exit(9);
        }

        if (pid != 0) {
            wk->wk_pid = pid;
            continue;
        }

        // NOTE/FIX: exec* takes strings so this is the correct way
        if (opt_x) {
            char xid[20];
            sprintf(xid,"-W%d",i);
            execlp(pgmname,pgmname,xid,NULL);
            perror("execlp");
            exit(7);
        }

        // simulate what exec would do -- call it direct
        opt_W = i;
        worker();
    }

    pid_t pid = fork();

    if (pid < 0) {
        perror("fork2");
        exit(5);
    }

    // receiving done here
    if (pid == 0) {
        for (i = 0; i < WORKNR; i++) {

            // read the messages from the worker queue
            ssize_t received = mq_receive(respQueue,(char *) &resp,
                sizeof(resp),NULL);

            printf("Farmer received worker response: %p with length %ld value %d\n",
                &resp,received,resp.b);
            // HERE &resp is always the same
        }

        // end worker process
        for (i = 0; i < WORKNR; i++) {
            wk = &worklist[i];
            req.a = -1;
            sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
            printf("Farmer sent stop -- sent=%ld\n",sent);
        }

        // exit the farmer's receiver
        printf("farmer: receiver exiting ...\n");
        exit(0);
    }

    // sending done here
    else {
        for (i = 0; i < WORKNR; i++) {
            wk = &worklist[i];
            req.a = i;
            sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
            printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
        }

        // wait for farmer's receiver to complete
        printf("farmer: waiting for receiver to finish ...\n");
        waitpid(pid,NULL,0);
    }

    mq_close(respQueue);

    // wait for all workers to complete
    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];
        printf("farmer: waiting for worker to finish ...\n");
        waitpid(wk->wk_pid,NULL,0);
        mq_close(wk->wk_req);
        mq_unlink(wk->wk_mqfile);
    }

    // clean up the message queues
    mq_unlink(mqfile_to_farmer);
}

int
main(int argc,char **argv)
{
    char *cp;

    pgmname = argv[0];

    --argc;
    ++argv;

    opt_W = -1;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'W':
            opt_W = atoi(cp + 2);
            break;
        case 'x':
            opt_x = ! opt_x;
            break;
        }
    }

    if (opt_W >= 0)
        worker();
    else
        farmer();

    return 0;
}

UPDATE #2:

Here's a version that demonstrates single vs. multiple request queues. The workers now check the destination id in the message they receive matches their worker number.

If you just run it with no options, you'll get multiple queues and the "good" output.

If you run it with -b [and optionally -s] you'll get a single request queue and the program will see misrouted messages (e.g. worker 0 grabs a message intended for worker 1).

Single queue is a subset. As long as workers are "equal", it's okay. But, if they're not (e.g. one worker can do things others can't), being able to queue to the correct worker is important. An example would be a network node that has special FPGA assisted calculation hardware that other ones don't and some requests need that acceleration.

Also, single queue is self balancing by the workers. That is one form of scheduling, but there are other models. (e.g. the farmer wants to retain control of the distribution of labor). Or, the farmer has to stop one worker and keep the others going (e.g. the system being stopped will be powered off for maintenance).

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>                     // for execlp
#include <mqueue.h>                     // for mq

typedef unsigned int u32;

typedef struct {
    u32 seqno;                          // sequence number
    int toval;                          // destination id
    int fmval;                          // responder worker id
} request_t;

char *pgmname;
int opt_b;                              // 1=broadcast
int opt_i;                              // 1=ignore errors
int opt_x;                              // 1=do execlp
int opt_s;                              // number of ms to sleep
int opt_S;                              // sequence maximum
int opt_W;                              // worker xid

#define WORKNR      3
#define MAXMSG      10

char mqfile_to_farmer[80];
mqd_t respQueue;

char mqfile_to_worker[80];
mqd_t reqQueue;

struct mq_attr attr;

pid_t ppid;
pid_t curpid;
pid_t pidrcvr;

// per-worker control
typedef struct {
    int wk_xid;
    pid_t wk_pid;
    mqd_t wk_req;
    u32 wk_seqno;
    char wk_mqfile[80];
} worker_t;
worker_t worklist[WORKNR];

#define FORALL_WK \
    wk = &worklist[0];  wk < &worklist[WORKNR];  ++wk

#define sysfault(_fmt...) \
    do { \
        printf(_fmt); \
        if (ppid) \
            kill(ppid,SIGUSR1); \
        exit(1); \
    } while (0)

void
_sysfault(void)
{

    __asm__ __volatile__("" :::);
}

#define logprt(_fmt...) \
    do { \
        int sverr = errno; \
        _logprt(); \
        printf(_fmt); \
        errno = sverr; \
    } while (0)

int logxid;
double logzero;

void
loginit(int xid)
{

    logxid = xid;
}

void
_logprt(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);
    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    if (logzero == 0)
        logzero = sec;

    sec -= logzero;

    switch (logxid) {
    case WORKNR:
        printf("%.9f LOG F: ",sec);
        break;
    case WORKNR + 1:
        printf("%.9f LOG R: ",sec);
        break;
    default:
        printf("%.9f LOG W%d: ",sec,logxid);
        break;
    }
}

void
logexit(int code)
{

    exit(code);
}

void
allwait(void)
{
    worker_t *wk;

    // wait for farmer's receiver to complete
    if (pidrcvr) {
        logprt("farmer: waiting for receiver to finish ...\n");
        waitpid(pidrcvr,NULL,0);
        pidrcvr = 0;
    }

    for (FORALL_WK) {
        if (wk->wk_pid) {
            logprt("farmer: waiting for worker %d to finish ...\n",wk->wk_xid);
            waitpid(wk->wk_pid,NULL,0);
            wk->wk_pid = 0;
        }

        if (opt_b)
            continue;

        logprt("farmer: closing and removing worker queue ...\n");
        mq_close(wk->wk_req);
        mq_unlink(wk->wk_mqfile);
    }
}

void
sighdr(int signo)
{
    worker_t *wk;

    switch (signo) {
    case SIGUSR1:  // request to master
        logprt("sighdr: got master stop signal ...\n");

        if (pidrcvr)
            kill(pidrcvr,SIGUSR2);

        for (FORALL_WK) {
            if (wk->wk_pid)
                kill(wk->wk_pid,SIGUSR2);
        }

        allwait();
        logprt("farmer: abnormal termination\n");

        logexit(1);
        break;

    case SIGUSR2:  // request to slaves
        logexit(1);
        break;
    }
}

void
reqopen(mqd_t *fdp,const char *file,int flag)
{
    mqd_t fd;
    int err;

    attr.mq_maxmsg = MAXMSG;
    attr.mq_msgsize = sizeof(request_t);

    fd = *fdp;
    if (fd >= 0)
        mq_close(fd);

    fd = mq_open(file,flag | O_CREAT,0600,&attr);
    if (fd < 0)
        sysfault("reqopen: %s open fault -- %s\n",file,strerror(errno));

    err = mq_getattr(fd,&attr);
    if (err < 0)
        sysfault("reqopen: %s getattr fault -- %s\n",file,strerror(errno));

    if (attr.mq_msgsize != sizeof(request_t))
        sysfault("reqopen: %s size fault -- mq_msgsize=%ld siz=%ld\n",
            file,attr.mq_msgsize,sizeof(request_t));

    logprt("reqopen: open -- file='%s' fd=%d\n",file,fd);

    *fdp = fd;
}

void worker(int execflg);

void
farmer(void)
{
    request_t req;
    request_t resp;
    ssize_t sent;
    worker_t *wk;
    u32 seqno;
    int xid;

    ppid = getpid();
    curpid = ppid;
    loginit(WORKNR);

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
    sprintf(mqfile_to_worker,"/mq_request_%d",ppid);

    respQueue = -1;
    reqopen(&respQueue,mqfile_to_farmer,O_RDONLY | O_CREAT | O_EXCL);

    reqQueue = -1;
    if (opt_b)
        reqopen(&reqQueue,mqfile_to_worker,O_WRONLY | O_CREAT | O_EXCL);

    // create the separate request queues
    xid = 0;
    for (FORALL_WK) {
        wk->wk_xid = xid++;

        if (opt_b) {
            logprt("farmer: common request queue -- reqQueue=%d\n",reqQueue);
            wk->wk_req = reqQueue;
            continue;
        }

        sprintf(wk->wk_mqfile,"/mq_request_%d_%d",ppid,wk->wk_xid);

        wk->wk_req = -1;
        reqopen(&wk->wk_req,wk->wk_mqfile,O_WRONLY | O_CREAT | O_EXCL);
        logprt("farmer: separate request queue -- wk_req=%d\n",wk->wk_req);
    }

    // fork the workers
    for (FORALL_WK) {
        pid_t pid = fork();

        if (pid < 0)
            sysfault("farmer: fork fault -- %s\n",strerror(errno));

        if (pid != 0) {
            wk->wk_pid = pid;
            continue;
        }

        // NOTE/FIX: exec* takes strings so this is the correct way
        if (opt_x) {
            char opt[2][20];

            sprintf(opt[0],"-b%d",opt_b);
            sprintf(opt[1],"-W%d",wk->wk_xid);

            execlp(pgmname,pgmname,opt[0],opt[1],NULL);
            sysfault("farmer: execlp error -- %s\n",strerror(errno));
        }

        // simulate what exec would do -- call it direct
        opt_W = wk->wk_xid;
        worker(0);
    }

    pidrcvr = fork();
    if (pidrcvr < 0)
        sysfault("farmer: fork2 error -- %s\n",strerror(errno));

    // receiving done here
    if (pidrcvr == 0) {
        curpid = getpid();
        loginit(WORKNR + 1);

        for (int i = 0; i < (WORKNR * opt_S); i++) {
            // read the messages from the worker queue
            ssize_t received = mq_receive(respQueue,(char *) &resp,
                sizeof(resp),NULL);

            wk = &worklist[resp.fmval];
            logprt("received worker response: length %d fmval=%d seqno=%u wk_seqno=%u\n",
                (int) received,resp.fmval,resp.seqno,wk->wk_seqno);

            if (received < 0) {
                if (! opt_i)
                    sysfault("farmer: received fault -- %s\n",strerror(errno));
            }

            if (resp.seqno != wk->wk_seqno) {
                logprt("sequence fault\n");
                if (! opt_i)
                    sysfault("farmer: sequence fault\n");
            }

            ++wk->wk_seqno;
        }

        // send stop to worker processes
        for (FORALL_WK) {
            req.toval = -1;
            sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
            logprt("Farmer sent stop -- wk_xid=%d sent=%d\n",
                wk->wk_xid,(int) sent);

            if (sent < 0) {
                if (! opt_i)
                    sysfault("farmer: send fault on stop -- %s\n",
                        strerror(errno));
            }
        }

        // exit the farmer's receiver
        logprt("farmer: receiver exiting ...\n");
        logexit(0);
    }

    // sending done here
    else {
        for (seqno = 0;  seqno < opt_S;  ++seqno) {
            for (FORALL_WK) {
                wk->wk_seqno = seqno;
                req.seqno = seqno;
                req.toval = wk->wk_xid;

                sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
                logprt("Farmer sent to wk_xid=%d wk_req=%d -- sent=%d\n",
                    wk->wk_xid,wk->wk_req,(int) sent);
                if (sent < 0) {
                    if (! opt_i)
                        sysfault("farmer: send fault -- %s\n",strerror(errno));
                }
            }
        }
    }

    mq_close(respQueue);

    // wait for all workers to complete
    allwait();

    // clean up the message queues
    mq_unlink(mqfile_to_farmer);

    logprt("farmer: complete\n");
    logexit(0);
}

void
worker(int execflg)
{
    request_t req;
    request_t resp;
    ssize_t sent;
    u32 seqno;
    int slpcnt;

    if (execflg)
        ppid = getppid();
    curpid = getpid();

    loginit(opt_W);
    logprt("worker: my index is %d ...\n",opt_W);

    attr.mq_maxmsg = MAXMSG;

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
    reqopen(&respQueue,mqfile_to_farmer,O_WRONLY);

    if (opt_b)
        sprintf(mqfile_to_worker,"/mq_request_%d",ppid);
    else
        sprintf(mqfile_to_worker,"/mq_request_%d_%d",ppid,opt_W);
    reqopen(&reqQueue,mqfile_to_worker,O_RDONLY);

    seqno = 0;

    slpcnt = opt_s;
    slpcnt *= 1000;
    slpcnt *= opt_W;

    while (1) {
        if (slpcnt > 0) {
            logprt("sleep %d\n",slpcnt);
            usleep(slpcnt);
            slpcnt = 0;
        }

        // receiving
        errno = 0;
        ssize_t received = mq_receive(reqQueue,(char *) &req,
            sizeof(req),NULL);

        logprt("received length %d -- seqno=%u toval=%d\n",
            (int) received,req.seqno,req.toval);

        if (received < 0)
            sysfault("worker: mq_receive fault -- %s\n",strerror(errno));

        // received stop signal
        if (req.toval < 0) {
            logprt("stopping ...\n");
            break;
        }

        if (req.toval != opt_W) {
            logprt("misroute\n");
            if (! opt_i)
                sysfault("worker: misroute fault\n");
        }

        if (req.seqno != seqno) {
            logprt("sequence fault\n");
            if (! opt_i)
                sysfault("worker: sequence fault\n");
        }

        // do something with request data
        resp.seqno = req.seqno;
        resp.toval = req.toval;
        resp.fmval = opt_W;

        // send response
        errno = 0;
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);

        logprt("sent response with length %d -- seqno=%u toval=%d\n",
            (int) sent,req.seqno,resp.toval);

        // HERE &resp is always different (doesn't print)
        if (sent < 0)
            sysfault("worker: mq_send fault -- %s\n",strerror(errno));

        ++seqno;
    }

    mq_close(reqQueue);
    mq_close(respQueue);

    logexit(0);
}

int
main(int argc,char **argv)
{
    char *cp;

    pgmname = argv[0];

    --argc;
    ++argv;

    opt_W = -1;
    opt_S = 3;

    reqQueue = -1;
    respQueue = -1;

    signal(SIGUSR1,sighdr);
    signal(SIGUSR2,sighdr);

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'b':  // broadcast mode (single request queue)
            cp += 2;
            opt_b = (*cp != 0) ? atoi(cp) : 1;
            break;

        case 'i':  // ignore errors
            cp += 2;
            opt_i = (*cp != 0) ? atoi(cp) : 1;
            break;

        case 'S':  // sequence maximum
            cp += 2;
            opt_S = (*cp != 0) ? atoi(cp) : 3;
            break;

        case 's':  // sleep mode (milliseconds)
            cp += 2;
            opt_s = (*cp != 0) ? atoi(cp) : 3;
            break;

        case 'W':  // worker number
            cp += 2;
            opt_W = atoi(cp + 2);
            break;

        case 'x':  // use execlp
            opt_x = ! opt_x;
            break;
        }
    }

    if (opt_W >= 0)
        worker(1);
    else
        farmer();

    return 0;
}
Craig Estey
  • 30,627
  • 4
  • 24
  • 48
  • In my original code, I also print what resp contains s.a. resp.a where a is an integer. It always shows the same value as well. I'll update my question with that, thanks. – wraithie Sep 23 '16 at 19:49
  • Try to post enough that it could be downloaded and run. Otherwise, it's difficult to speculate whether the parent is erring or child/client is just always sending the same data. It's the _do stuff with request data_ code we need to see. – Craig Estey Sep 23 '16 at 19:52
  • Working on runnable code. I get a "No such file or directory" segmentation fault on `sprintf (mq_farmer, "/mq_request_%s", getpid());`. Don't know why. – wraithie Sep 23 '16 at 21:29
  • Changed it to `sprintf (mq_farmer, "/mq_request_%s_%d", "foo", getpid());` and error is gone. Again don't know why. Above is my attempt at a runnable code only worker doesn't receive I think, at least it doesn't print the "Worker received" message. – wraithie Sep 23 '16 at 22:46
  • The wronly instead of rdonly for the resp queue was the problem, thanks. – wraithie Sep 24 '16 at 08:23