Intention
I want to allow a client to send a task to some server at a fixed address. The server may take that task and perform it at some arbitrary point in the future, but may still take requests from other clients before then. After performing the task, the server will reply to the client, which may have been running a blocking wait on the reply. The work and clients come dynamically, so there can't be a fixed initial number. The work is done in a non-thread-safe context, so workers can't exist on different threads, so all work should take place in a single thread.
Implementation
The following example1 is not a complete implementation of the server, only a compilable section of the sequence that should be able to take place (but is in reality hanging). Two clients send an integer each, and the server takes one request, then the next request, echo replies to the first request, then echo replies to the second request. The intention isn't to get the responses ordered, only to allow for the holding of multiple requests simultaneously by the server. What actually happens here is that the second worker hangs waiting on the request - this is what confuses me, as DEALER sockets should route outgoing messages in a round-robin strategy.
#include <unistd.h>
#include <stdio.h>
#include <zmq.h>
#include <sys/wait.h>
int client(int num)
{
void *context, *client;
int buf[1];
context = zmq_ctx_new();
client = zmq_socket(context, ZMQ_REQ);
zmq_connect(client, "tcp://localhost:5559");
*buf = num;
zmq_send(client, buf, 1, 0);
*buf = 0;
zmq_recv(client, buf, 1, 0);
printf("client %d receiving: %d\n", num, *buf);
zmq_close(client);
zmq_ctx_destroy(context);
return 0;
}
void multipart_proxy(void *from, void *to)
{
zmq_msg_t message;
while (1) {
zmq_msg_init(&message);
zmq_msg_recv(&message, from, 0);
int more = zmq_msg_more(&message);
zmq_msg_send(&message, to, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&message);
if (!more) break;
}
}
int main(void)
{
int status;
if (fork() == 0) {
client(1);
return(0);
}
if (fork() == 0) {
client(2);
return 0;
}
/* SERVER */
void *context, *frontend, *backend, *worker1, *worker2;
int wbuf1[1], wbuf2[1];
context = zmq_ctx_new();
frontend = zmq_socket(context, ZMQ_ROUTER);
backend = zmq_socket(context, ZMQ_DEALER);
zmq_bind(frontend, "tcp://*:5559");
zmq_bind(backend, "inproc://workers");
worker1 = zmq_socket(context, ZMQ_REP);
zmq_connect(worker1, "inproc://workers");
multipart_proxy(frontend, backend);
*wbuf1 = 0;
zmq_recv(worker1, wbuf1, 1, 0);
printf("worker1 receiving: %d\n", *wbuf1);
worker2 = zmq_socket(context, ZMQ_REP);
zmq_connect(worker2, "inproc://workers");
multipart_proxy(frontend, backend);
*wbuf2 = 0;
zmq_recv(worker2, wbuf2, 1, 0);
printf("worker2 receiving: %d\n", *wbuf2);
zmq_send(worker1, wbuf1, 1, 0);
multipart_proxy(backend, frontend);
zmq_send(worker2, wbuf2, 1, 0);
multipart_proxy(backend, frontend);
wait(&status);
zmq_close(frontend);
zmq_close(backend);
zmq_close(worker1);
zmq_close(worker2);
zmq_ctx_destroy(context);
return 0;
}
Other Options
I have looked at CLIENT and SERVER sockets and they appear to be capable on paper, however in practice they're sufficiently new that the system version of ZeroMQ that I have doesn't yet support them. If it is not possible to perform this in ZeroMQ, any alternative suggestions are very welcome.
1 Based on the Shared Queue section of the ZeroMQ guide.