6

I want to use libev with multiple threads for the handling of tcp connections. What I want to is:

  1. The main thread listen on incoming connections, accept the connections and forward the connection to a workerthread.

  2. I have a pool of workerthreads. The number of threads depends on the number of cpu's. Each worker-thread has an event loop. The worker-thread listen if I can write on the tcp socket or if somethings available for reading.

I looked into the documentation of libev and I known this can be done with libev, but I can't find any example how I have to do that.
Does someone has an example? I think that I have to use the ev_loop_new() api, for the worker-threads and for the main thread I have to use the ev_default_loop() ?

Regards

user2004360
  • 209
  • 2
  • 3
  • 8
  • Does nobody has done this before ? since I can't find any example how to dispatch a connection to another thread using lib_ev ? I tried using ev_async_send but I get "libev: pipe_w not active, but pipe not written" – user2004360 Jan 31 '13 at 17:31
  • Where's that libev documentation you're talking about? I can't find any :/ (except some man page). – SasQ Aug 24 '20 at 18:32

2 Answers2

8

The following code can be extended to multiple threads

//This program is demo for using pthreads with libev.
//Try using Timeout values as large as 1.0 and as small as 0.000001
//and notice the difference in the output

//(c) 2009 debuguo
//(c) 2013 enthusiasticgeek for stack overflow
//Free to distribute and improve the code. Leave credits intact

#include <ev.h>
#include <stdio.h> // for puts
#include <stdlib.h>
#include <pthread.h>

pthread_mutex_t lock;
double timeout = 0.00001;
ev_timer timeout_watcher;
int timeout_count = 0;

ev_async async_watcher;
int async_count = 0;

struct ev_loop* loop2;

void* loop2thread(void* args)
{
    printf("Inside loop 2"); // Here one could initiate another timeout watcher
    ev_loop(loop2, 0);       // similar to the main loop - call it say timeout_cb1
    return NULL;
}

static void async_cb (EV_P_ ev_async *w, int revents)
{
    //puts ("async ready");
    pthread_mutex_lock(&lock);     //Don't forget locking
    ++async_count;
    printf("async = %d, timeout = %d \n", async_count, timeout_count);
    pthread_mutex_unlock(&lock);   //Don't forget unlocking
}

static void timeout_cb (EV_P_ ev_timer *w, int revents) // Timer callback function
{
    //puts ("timeout");
    if (ev_async_pending(&async_watcher)==false) { //the event has not yet been processed (or even noted) by the event loop? (i.e. Is it serviced? If yes then proceed to)
        ev_async_send(loop2, &async_watcher); //Sends/signals/activates the given ev_async watcher, that is, feeds an EV_ASYNC event on the watcher into the event loop.
    }

    pthread_mutex_lock(&lock);     //Don't forget locking
    ++timeout_count;
    pthread_mutex_unlock(&lock);   //Don't forget unlocking
    w->repeat = timeout;
    ev_timer_again(loop, &timeout_watcher); //Start the timer again.
}

int main (int argc, char** argv)
{
    if (argc < 2) {
        puts("Timeout value missing.\n./demo <timeout>");
        return -1;
    }
    timeout = atof(argv[1]);

    struct ev_loop *loop = EV_DEFAULT;  //or ev_default_loop (0);

    //Initialize pthread
    pthread_mutex_init(&lock, NULL);
    pthread_t thread;

    // This loop sits in the pthread
    loop2 = ev_loop_new(0);

    //This block is specifically used pre-empting thread (i.e. temporary interruption and suspension of a task, without asking for its cooperation, with the intention to resume that task later.)
    //This takes into account thread safety
    ev_async_init(&async_watcher, async_cb);
    ev_async_start(loop2, &async_watcher);
    pthread_create(&thread, NULL, loop2thread, NULL);

    ev_timer_init (&timeout_watcher, timeout_cb, timeout, 0.); // Non repeating timer. The timer starts repeating in the timeout callback function
    ev_timer_start (loop, &timeout_watcher);

    // now wait for events to arrive
    ev_loop(loop, 0);
    //Wait on threads for execution
    pthread_join(thread, NULL);

    pthread_mutex_destroy(&lock);
    return 0;
}
enthusiasticgeek
  • 2,640
  • 46
  • 53
3

Using libev within different threads at the same time is fine as long as each of them runs its own loop[1]. The c++ wrapper in libev (ev++.h) always uses the default loop instead of letting you specify which one you want to use. You should use the C header instead (ev.h) which allows you to specify which loop to use (e.g. ev_io_start takes a pointer to an ev_loop but the ev::io::start doesn't).

You can signal another thread's ev_loop safely through ev_async.

[1]http://doc.dvgu.ru/devel/ev.html#threads_and_coroutines

Till Varoquaux
  • 549
  • 5
  • 9
  • This is clear. do you have a example ? In the worker thread I start an eventloop as follows _eventLoop = ev_loop_new(); while(1) {ev_loop(_eventLoop,0); } But if I add a watcher to this loop using ev_io_start it takes a long time before I get a notification from the watcher. – user2004360 Jan 31 '13 at 11:47
  • I do not know of any good example of multi-event loop uses of libev. It is, however, a fairly idiomatic use of event loops (nginx for instance is built with this infrastructure). The author of libev has recommended this multi-threaderd/multi-loop several times on the mailling list. Without knowing anymore about your application I cannot tell you what might be wrong. – Till Varoquaux Jan 31 '13 at 18:29
  • @user2004360 I know this is an oldie, but the error you got is because you probably didn't call ev_async_start() on your watcher (in case anyone else gets here looking for this :) – gnobal Jan 19 '15 at 15:46
  • 1
    link is broken :/ – stack user Jun 20 '17 at 12:59