4

I recently did some testing with kernel events and I came up with the following:

  • Does it make sense to use a kernel event for accepting sockets? My testing showed that I was only able to handle one accept at once (even if the eventlist array is bigger)(Makes sense to me cause .ident == sockfd is only true for one socket).

  • I thought the use of kevent is mainly to read from multiple sockets at once. Is that true?

Is this how a TCP server is done with a kqueue implementation? :


  • Listening Thread (without kqueue)
    • Accepts new connections and adds FD to a worker kqueue. QUESTION: Is this even possible? My testing showed yes, but is it guaranteed that the worker thread will be aware of the changes and is kevent really thread safe?

  • Worker thread (with kqueue)

    • Waits on reads on file descriptors added from the listening thread.

    QUESTION: How many sockets at once would make sense to check for updates?


Thanks

Marco
  • 7,007
  • 2
  • 19
  • 49

2 Answers2

3

This is not really an answer but I made a little server script with kqueue explaining the problem:

#include <stdio.h>          // fprintf
#include <sys/event.h>      // kqueue
#include <netdb.h>          // addrinfo
#include <arpa/inet.h>      // AF_INET
#include <sys/socket.h>     // socket
#include <assert.h>         // assert
#include <string.h>         // bzero
#include <stdbool.h>        // bool
#include <unistd.h>         // close

int main(int argc, const char * argv[])
{

    /* Initialize server socket */
    struct addrinfo hints, *res;
    int sockfd;

    bzero(&hints, sizeof(hints));
    hints.ai_family     = AF_INET;
    hints.ai_socktype   = SOCK_STREAM;

    assert(getaddrinfo("localhost", "9090", &hints, &res) == 0);

    sockfd = socket(AF_INET, SOCK_STREAM, res->ai_protocol);

    assert(sockfd > 0);

    {
        unsigned opt = 1;

        assert(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == 0);

        #ifdef SO_REUSEPORT
        assert(setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == 0);
        #endif
    }

    assert(bind(sockfd, res->ai_addr, res->ai_addrlen) == 0);

    freeaddrinfo(res);

    /* Start to listen */
    (void)listen(sockfd, 5);

    {
        /* kevent set */
        struct kevent kevSet;
        /* events */
        struct kevent events[20];
        /* nevents */
        unsigned nevents;
        /* kq */
        int kq;
        /* buffer */
        char buf[20];
        /* length */
        ssize_t readlen;

        kevSet.data     = 5;    // backlog is set to 5
        kevSet.fflags   = 0;
        kevSet.filter   = EVFILT_READ;
        kevSet.flags    = EV_ADD;
        kevSet.ident    = sockfd;
        kevSet.udata    = NULL;

        assert((kq = kqueue()) > 0);

        /* Update kqueue */
        assert(kevent(kq, &kevSet, 1, NULL, 0, NULL) == 0);

        /* Enter loop */
        while (true) {
            /* Wait for events to happen */
            nevents = kevent(kq, NULL, 0, events, 20, NULL);

            assert(nevents >= 0);

            fprintf(stderr, "Got %u events to handle...\n", nevents);

            for (unsigned i = 0; i < nevents; ++i) {
                struct kevent event = events[i];
                int clientfd        = (int)event.ident;

                /* Handle disconnect */
                if (event.flags & EV_EOF) {

                    /* Simply close socket */
                    close(clientfd);

                    fprintf(stderr, "A client has left the server...\n");
                } else if (clientfd == sockfd) {
                    int nclientfd = accept(sockfd, NULL, NULL);

                    assert(nclientfd > 0);

                    /* Add to event list */
                    kevSet.data     = 0;
                    kevSet.fflags   = 0;
                    kevSet.filter   = EVFILT_READ;
                    kevSet.flags    = EV_ADD;
                    kevSet.ident    = nclientfd;
                    kevSet.udata    = NULL;

                    assert(kevent(kq, &kevSet, 1, NULL, 0, NULL) == 0);

                    fprintf(stderr, "A new client connected to the server...\n");

                    (void)write(nclientfd, "Welcome to this server!\n", 24);
                } else if (event.flags & EVFILT_READ) {

                    /* sleep for "processing" time */
                    readlen = read(clientfd, buf, sizeof(buf));

                    buf[readlen - 1] = 0;

                    fprintf(stderr, "bytes %zu are available to read... %s \n", (size_t)event.data, buf);

                    sleep(4);
                } else {
                    fprintf(stderr, "unknown event: %8.8X\n", event.flags);
                }
            }
        }
    }

    return 0;
}

Every time a client sends something the server experiences a "lag" of 4 seconds. (I exaggerated a bit, but for testing quite reasonable). So how do get around to that problem? I see worker threads (pool) with own kqueue as possible solution, then no connection lag would occur. (each worker thread reads a certain "range" of file descriptors)

Marco
  • 7,007
  • 2
  • 19
  • 49
  • You should make your sockets non-blocking. But the main problem is that you cannot sleep in an event loop. The whole point of event loops is that the event handler never blocks; if it needs to wait for something to happen, it queues a wait for that event onto the event queue. Event handlers shouldn't do things which are computationally expensive, either; if there are such tasks, they need to be handed off to another thread. All of this is why event-driven programming is harder than threaded programming. – rici Aug 10 '14 at 18:24
  • Oh, and this really should have been a separate question. Do not use answers to post questions; there is no charge for asking questions and you should do so, for the benefit of those who have similar questions. – rici Aug 10 '14 at 18:25
  • @rici So how to solve this problem? On my first version I just made a thread for each new connection, but this is very bad when the number of connections increases. And since processing in an event loop is bad too, I don´t know what to do. Could you please give me an example concept of a server? (For clarity: I'm trying to implement an instant messenger server with **persistent** connections) – Marco Aug 10 '14 at 18:28
  • I'm not going to answer any more questions in comments. You probably need to use your favourite search engine to find a kqueue tutorial. In case, that's too challenging, here's an example kqueue program I found in a few seconds with google: http://markmail.org/thread/rvkieyevo5u2wbtz – rici Aug 11 '14 at 04:30
  • I already did that, but found nothing useful like this one. These gave always bad results. For example, this server always gave me a BAD ACCESS on too many open connections (when freeing the buffer). – Marco Aug 11 '14 at 08:53
  • "Note: it's particularly important to remember that readiness notification from the kernel is only a hint; the file descriptor might not be ready anymore when you try to read from it. That's why it's important to use nonblocking mode when using readiness notification." - from [here](http://kegel.com/c10k.html#nb). So you should set O_NONBLOCK socket and possible repeat loop once. – Slav Feb 04 '16 at 08:02
  • Another approach is to set socket O_NONBLOCK and add EV_CLEAR flag to event's flags. It will cause to work kqueue in 'edge-triggered' mode (instead of 'level-triggered' mode). This means that when you read socket it'd completely ready. – Slav Feb 04 '16 at 10:30
  • @Vyacheslav Thanks for your comment, I'll have a look at it when I have time :) – Marco Feb 04 '16 at 18:03
2

Normally, you use kqueue as an alternative to threads. If you're going to use threads, you can just set up a listening thread and a worker threadpool with one thread per accepted connection. That's a much simpler programming model.

In an event-driven framework, you would put both the listening socket and all the accepted sockets into the kqueue, and then handle events as they occur. When you accept a socket, you add it to the kqueue, and when a socket handler finishes it works, it could remove the socket from the kqueue. (The latter is not normally necessary because closing a fd automatically removes any associated events from any kqueue.)

Note that every event registered with a kqueue has a void* userdata, which can be used to identify the desired action when the event fires. So it's not necessary that every event queue have a unique event handler; in fact, it is common to have a variety of handlers. (For example, you might also want to handle a control channel set up through a named pipe.)

Hybrid event/thread models are certainly possible; otherwise, you cannot take advantage of multicore CPUs. One possible strategy is to use the event queue as a dispatcher in a producer-consumer model. The queue handler would directly handle events on the listening socket, accepting the connection and adding the accepted fd into the event queue. When a client connection event occurs, the event would be posted into the workqueue for later handling. It's also possible to have multiple workqueues, one per thread, and have the accepter guess which workqueue a new connection should be placed in, presumably on the basis of that thread's current load.

rici
  • 234,347
  • 28
  • 237
  • 341
  • First of all, thank you for your explanation. I understand, but my testing showed that the server then cannot accept multiple connections at once :(, and I'm quite unsure if reading,writing and listening in the same thread makes much sense. For example, on a client read I need to parse the packet, then do some mysql queries and then create a response, in this time no client could connect to my server... so is it really done this way? – Marco Aug 10 '14 at 16:54
  • @d3l: Then you are doing something wrong :) Perhaps you need to ask a different question with a minimal compilable example. – rici Aug 10 '14 at 17:07
  • I made a little example explaining the problem. Greetings – Marco Aug 10 '14 at 17:40
  • @rici I liked the question, I have asked a similar question just now myself. But the question that I think your answer addressed is - can multiple threads add file descriptors to the kqueue (are kqueues thread safe)? If so how would one go about doing this? Also if OP was able to find a way to do this could he/she share it with me? – Curious Jan 02 '16 at 02:21
  • @rici A clarification to the above comment. I do not want to create a circular dependance with the kqueue module and the TCP logic. I wanted to do this in a manner that would have them both be separate entities. If I used what you said about the accept call adding the file descriptor to the kqueue then that logic will have to be added to the kqueue module – Curious Jan 02 '16 at 02:27
  • @curious: I'm pretty sure kqueue is thread-safe. The interesting work is done inside the kernel, after all, so potential dataraces would be in kernel code; that's unacceptable. But I don't have a FreeBSD handy at the moment to test it with. – rici Jan 02 '16 at 02:29
  • @rici Thank you for the quick reply! Even though you do not have a BSD to test this on you are clearly more knowledgable then I am! So I will ask you my question. My question boils down to this - `epoll()` has a function that can be used to declare interest in getting notified of events on a certain file descriptor `epoll_ctl()` can one imitate this feature in kqueues? Is it done by calling `kevent(kqueue_fd, list_of_fd_to_add, size_of_list, 0,0,0)`? Where 0s are passed to the event list that is returned on a blocking call and a timeout pointing to null indicating that the call must not block. – Curious Jan 02 '16 at 02:36
  • @rici I think i just got my question answered here http://eradman.com/posts/kqueue-tcp.html which proves that what I said was correct, if you think I was wrong please let me know! Thanks! – Curious Jan 02 '16 at 03:01
  • @curious: if you don't ask for any events to return, the call won't block. I don't think the timeout is relevant in that case but I could be wrong. Check `man kevent` – rici Jan 02 '16 at 03:27