2

I currently learning how to use the lib ZeroMQ that a friend advise me to use for a personnal project.

After reading the documentation and planning how to use the lib for my project, I began testing the project with the code given by the documentation. The test I used was this one . Unfortunatly it doesn't work. I did some minor modification to test it. (I give you the exact code I have on my test, it's a lot I am sorry but without everything I think it doesn't make sense and it's impossible to help me :/ ).

I almost changed nothing from the test given by the documentation, just added some output to test and I also deleted the poll in the client (I thought the probleme came from here because it was blocking the infinite loop even thought there was a timeout).

    #include <vector>
    #include <thread>
    #include <memory>
    #include <functional>


    #include <zmq.h>
    #include <zmq.hpp>
    #include <zhelper.hpp>

    //  This is our client task class.
    //  It connects to the server, and then sends a request once per second
    //  It collects responses as they arrive, and it prints them out. We will
    //  run several client tasks in parallel, each with a different random ID.
    //  Attention! -- this random work well only on linux.

    class client_task {
    public:
        client_task()
            : ctx_(1),
              client_socket_(ctx_, ZMQ_DEALER)
        {}

        void start() {
            // generate random identity
            char identity[10] = {};
            sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000));
            printf("-> %s\n", identity);
            client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
            client_socket_.connect("tcp://localhost:5570");

            zmq_pollitem_t items;
            items.socket = &client_socket_;
            items.fd = 0;
            items.events = ZMQ_POLLIN;
            items.revents = 0;

            int request_nbr = 0;
            try {
                while (true) {

                    for (int i = 0; i < 100; ++i) {

                        // 10 milliseconds
                        sleep(1);
                        std::cout << "ici" << std::endl;
                        if (items.revents & ZMQ_POLLIN) {
                            printf("\n%s ", identity);
                            s_dump(client_socket_);
                        }

                        char request_string[16] = {};
                        sprintf(request_string, "request #%d", ++request_nbr);
                        client_socket_.send(request_string, strlen(request_string));

                    }
                }

            }
            catch (std::exception &e)
            {}
        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t client_socket_;
    };

    //  Each worker task works on one request at a time and sends a random number
    //  of replies back, with random delays between replies:

    class server_worker {
    public:
        server_worker(zmq::context_t &ctx, int sock_type)
            : ctx_(ctx),
              worker_(ctx_, sock_type)
        {}

        void work() {
                worker_.connect("inproc://backend");

            try {
                while (true) {
                    zmq::message_t identity;
                    zmq::message_t msg;
                    zmq::message_t copied_id;
                    zmq::message_t copied_msg;
                    worker_.recv(&identity);
                    worker_.recv(&msg);
                    std::cout << "I never arrive here" << std::endl;

                    int replies = within(5);
                    for (int reply = 0; reply < replies; ++reply) {
                        std::cout << "LA" << std::endl;
                        s_sleep(within(1000) + 1);
                        copied_id.copy(&identity);
                        copied_msg.copy(&msg);
                        worker_.send(copied_id, ZMQ_SNDMORE);
                        worker_.send(copied_msg);
                    }
                }
            }
            catch (std::exception &e) {}
        }

    private:
        zmq::context_t &ctx_;
        zmq::socket_t worker_;
    };

    //  This is our server task.
    //  It uses the multithreaded server model to deal requests out to a pool
    //  of workers and route replies back to clients. One worker can handle
    //  one request at a time but one client can talk to multiple workers at
    //  once.

    class server_task {
    public:
        server_task()
            : ctx_(1),
              frontend_(ctx_, ZMQ_ROUTER),
              backend_(ctx_, ZMQ_DEALER)
        {}

        void run() {
            frontend_.bind("tcp://*:5570");
            backend_.bind("inproc://backend");

            server_worker * worker = new server_worker(ctx_, ZMQ_DEALER);
            std::thread worker_thread(std::bind(&server_worker::work, worker));
            worker_thread.detach();

            try {
                zmq::proxy(&frontend_, &backend_, NULL);
            }
            catch (std::exception &e) {}

        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t frontend_;
        zmq::socket_t backend_;
    };

    //  The main thread simply starts several clients and a server, and then
    //  waits for the server to finish.

    int main (void)
    {
        client_task ct1;
        client_task ct2;
        client_task ct3;
        server_task st;

        std::thread t1(std::bind(&client_task::start, &ct1));
        std::thread t2(std::bind(&client_task::start, &ct2));
        std::thread t3(std::bind(&client_task::start, &ct3));
        std::thread t4(std::bind(&server_task::run, &st));

        t1.detach();
        t2.detach();
        t3.detach();
        t4.detach();
        std::cout << "ok" << std::endl;
        getchar();
        std::cout << "ok" << std::endl;
        return 0;
    }

The output I get from this code is the following :

-> CC66-C879
-> 3292-E961
-> C4AA-55D1
ok
ici
ici
ici
... (infinite ici)

I really don't understand why it doesn't work. The poll in the client send an exception Socket operation on non-socket. The major probleme for me is that it's a test coming from the official documentation and I can't make it work. What is the problem about my utilisation of the socket ?

Thanks for your help

Lightness Races in Orbit
  • 378,754
  • 76
  • 643
  • 1,055
FreeYourSoul
  • 328
  • 4
  • 15

1 Answers1

4

I found out the problem.

There is a problem in the official documentation (some obvious mistake like the initialisation of the zmq_pollitem_t array) and another one that made my test not working.

For the zmq::poll or zmq::proxy, you need to cast the socket structure in void* and you mustn't use a pointer on the socket. ZMQ poll not working

After those modification it worked. I did another post to explain why here

Here is the corrected code without my additionnal testing output :

        //  Asynchronous client-to-server (DEALER to ROUTER)
    //
    //  While this example runs in a single process, that is to make
    //  it easier to start and stop the example. Each task has its own
    //  context and conceptually acts as a separate process.

    #include <vector>
    #include <thread>
    #include <memory>
    #include <functional>


    #include <zmq.h>
    #include <zmq.hpp>
    #include <zhelper.hpp>

    //  This is our client task class.
    //  It connects to the server, and then sends a request once per second
    //  It collects responses as they arrive, and it prints them out. We will
    //  run several client tasks in parallel, each with a different random ID.
    //  Attention! -- this random work well only on linux.

    class client_task {
    public:
        client_task()
            : ctx_(1),
              client_socket_(ctx_, ZMQ_DEALER)
        {}

        void start() {
            // generate random identity
            char identity[10] = {};
            sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000));
            printf("-> %s\n", identity);
            client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
            client_socket_.connect("tcp://localhost:5555");

            zmq_pollitem_t items[1];                
            items[0].socket = static_cast<void *> (client_socket_);
            items[0].fd = 0;
            items[0].events = ZMQ_POLLIN;
            items[0].revents = 0;
            int request_nbr = 0;
            try {
                while (true) {
                    for (int i = 0 ; i < 100; ++i) {

                    zmq::poll(items, 1, 10);
                    if (items[0].revents & ZMQ_POLLIN) {
                            printf("\n%s =>", identity);
                            s_dump(client_socket_);
                        }
                    }

                    char request_string[16] = {};
                    sprintf(request_string, "request #%d", ++request_nbr);
                    client_socket_.send(request_string, strlen(request_string));

                }

            }
            catch (std::exception &e)
            {
                std::cout << "exception :  "  << zmq_errno() << " "<< e.what() << std::endl;
                if (zmq_errno() == EINTR)
                    std::cout << "lol"<< std::endl;
            }
        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t client_socket_;
    };

    //  Each worker task works on one request at a time and sends a random number
    //  of replies back, with random delays between replies:

    class server_worker {
    public:
        server_worker(zmq::context_t &ctx, int sock_type)
            : ctx_(ctx),
              worker_(ctx_, sock_type)
        {}

        void work() {
                worker_.connect("inproc://backend");

            try {
                while (true) {
                    zmq::message_t identity;
                    zmq::message_t msg;
                    zmq::message_t copied_id;
                    zmq::message_t copied_msg;
                    worker_.recv(&identity);
                    worker_.recv(&msg);

                    int replies = within(5);
                    for (int reply = 0; reply < replies; ++reply) {
                        s_sleep(within(1000) + 1);
                        copied_id.copy(&identity);
                        copied_msg.copy(&msg);
                        worker_.send(copied_id, ZMQ_SNDMORE);
                        worker_.send(copied_msg);
                    }
                }
            }
            catch (std::exception &e)
            {
                std::cout << "Error in worker : " << e.what() << std::endl;
            }
        }

    private:
        zmq::context_t &ctx_;
        zmq::socket_t worker_;
    };

    //  This is our server task.
    //  It uses the multithreaded server model to deal requests out to a pool
    //  of workers and route replies back to clients. One worker can handle
    //  one request at a time but one client can talk to multiple workers at
    //  once.

    class server_task {
    public:
        server_task()
            : ctx_(1),
              frontend_(ctx_, ZMQ_ROUTER),
              backend_(ctx_, ZMQ_DEALER)
        {}

        void run() {
            frontend_.bind("tcp://*:5555");
            backend_.bind("inproc://backend");

            server_worker * worker = new server_worker(ctx_, ZMQ_DEALER);
            std::thread worker_thread(std::bind(&server_worker::work, worker));
            worker_thread.detach();

            try {
                zmq::proxy(static_cast<void *>(frontend_), static_cast<void *> (backend_), NULL);
            }
            catch (std::exception &e)
            {
                std::cout << "Error in Server : " << e.what() << std::endl;
            }

        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t frontend_;
        zmq::socket_t backend_;
    };

    //  The main thread simply starts several clients and a server, and then
    //  waits for the server to finish.

    int main (void)
    {
        client_task ct1;
        client_task ct2;
        client_task ct3;
        server_task st;

        std::thread t4(std::bind(&server_task::run, &st));
        t4.detach();
        std::thread t1(std::bind(&client_task::start, &ct1));
        std::thread t2(std::bind(&client_task::start, &ct2));
        std::thread t3(std::bind(&client_task::start, &ct3));

        t1.detach();
        t2.detach();
        t3.detach();

        getchar();
        return 0;
    }
Community
  • 1
  • 1
FreeYourSoul
  • 328
  • 4
  • 15