6

I am working on designing a websocket server which receives a message and saves it to an embedded database. For reading the messages I am using boost asio. To save the messages to the embedded database I see a few options in front of me:

  1. Save the messages synchronously as soon as I receive them over the same thread.
  2. Save the messages asynchronously on a separate thread.

I am pretty sure the second answer is what I want. However, I am not sure how to pass messages from the socket thread to the IO thread. I see the following options:

  1. Use one io service per thread and use the post function to communicate between threads. Here I have to worry about lock contention. Should I?
  2. Use Linux domain sockets to pass messages between threads. No lock contention as far as I understand. Here I can probably use BOOST_ASIO_DISABLE_THREADS macro to get some performance boost.

Also, I believe it would help to have multiple IO threads which would receive messages in a round robin fashion to save to the embedded database.

Which architecture would be the most performant? Are there any other alternatives from the ones I mentioned?

A few things to note:

  • The messages are exactly 8 bytes in length.
  • Cannot use an external database. The database must be embedded in the running process.
  • I am thinking about using RocksDB as the embedded database.
rahul
  • 2,269
  • 3
  • 28
  • 31
  • I would use boost.asio to communicate with devices etc, but if you need inter-thread communication I wouldn't consider it the right choice. Maybe you can just use a lock-free buffer as provided in boost.lockfree? The spsc_queue did it for me many times. I'd also think that would be the fastest way, in performance as well as in development. – Klemens Morgenstern May 24 '16 at 21:36

5 Answers5

2

I don't think you want to use a unix socket, which is always going to require a system call and pass data through the kernel. That is generally more suitable as an inter-process mechanism than an inter-thread mechanism.

Unless your database API requires that all calls be made from the same thread (which I doubt) you don't have to use a separate boost::asio::io_service for it. I would instead create an io_service::strand on your existing io_service instance and use the strand::dispatch() member function (instead of io_service::post()) for any blocking database tasks. Using a strand in this manner guarantees that at most one thread may be blocked accessing the database, leaving all the other threads in your io_service instance available to service non-database tasks.

Why might this be better than using a separate io_service instance? One advantage is that having a single instance with one set of threads is slightly simpler to code and maintain. Another minor advantage is that using strand::dispatch() will execute in the current thread if it can (i.e. if no task is already running in the strand), which may avoid a context switch.

For the ultimate optimization I would agree that using a specialized queue whose enqueue operation cannot make a system call could be fastest. But given that you have network i/o by producers and disk i/o by consumers, I don't see how the implementation of the queue is going to be your bottleneck.

rhashimoto
  • 15,650
  • 2
  • 52
  • 80
1

After benchmarking/profiling I found the facebook folly implementation of MPMC Queue to be the fastest by at least a 50% margin. If I use the non-blocking write method, then the socket thread has almost no overhead and the IO threads remain busy. The number of system calls are also much less than other queue implementations.

The SPSC queue with cond variable in boost is slower. I am not sure why that is. It might have something to do with the adaptive spin that folly queue uses.

Also, message passing (UDP domain sockets in this case) turned out to be orders of magnitude slower especially for larger messages. This might have something to do with copying of data twice.

rahul
  • 2,269
  • 3
  • 28
  • 31
0

You probably only need one io_service -- you can create additional threads which will process events occurring within the io_service by providing boost::asio::io_service::run as the thread function. This should scale well for receiving 8-byte messages from clients over the network socket.

For storing the messages in the database, it depends on the database & interface. If it's multi-threaded, then you might as well just send each message to the DB from the thread that received it. Otherwise, I'd probably set up a boost::lockfree::queue where a single reader thread pulls items off and sends them to the database, and the io_service threads append new messages to the queue when they arrive.

Is that the most efficient approach? I dunno. It's definitely simple, and gives you a baseline that you can profile if it's not fast enough for your situation. But I would recommend against designing something more complicated at first: you don't know whether you'll need it at all, and unless you know a lot about your system, it's practically impossible to say whether a complicated approach would perform any better than the simple one.

Dave M.
  • 1,496
  • 1
  • 12
  • 30
  • The database operation can block for many milliseconds, which is the reason I run it in a separate thread. What about message passing? And lock free queue would mean an infinite loop which wastes CPU cycles. – rahul May 28 '16 at 05:33
  • I'm not a Boost expert, but it looks like `lockfree` only spins if the machine doesn't have atomic operations. Otherwise, my guess is that producers only block each other briefly, and the consumer (database thread) only blocks if there are no messages. Your message passing is just putting the messages on the `lockfree::queue`. The threads share address space so you don't need to do anything else. – Dave M. May 28 '16 at 06:23
  • On review of `lockfree`, `pop` is nonblocking, so you'd need a condition variable that the consumer can sleep on. Producers have to tickle the condvar every time they post a new message. – Dave M. May 28 '16 at 06:30
  • Would a condition variable not make it same as a multi-threaded blocking queue? – rahul May 28 '16 at 06:32
  • The problem you need to avoid is contention between the producers and the consumer -- the producers are (or may be) receiving messages faster than the consumer can process them (put them into the database). So you need to make sure the producers don't have to wait for the consumer to finish before they can enqueue their current message and go on to get the next message. The `lockfree` queue lets the producers put messages on one end of the queue, no matter how many messages are there or what the consumer is doing. They enqueue the message and are free to do their next job. – Dave M. May 28 '16 at 15:26
  • The condition variable just provides a way to wake up the consumer if it ever catches up and saves all the messages in the database. It usually won't, if you have regular messages arriving, because it's slower to store in the database than to get the next one. But if the consumer ever catches up, it will be sitting there saying "is there any work? ... is there any work?" That's a busy loop to avoid. So your consumer will preferably work like this: [...dang, you can't format in a comment...I'll add another answer[ – Dave M. May 28 '16 at 15:28
0
void Consumer( lockfree::queue<uint64_t> &message_queue ) {
    // Connect to database...
    while (!Finished) {
        message_queue.consume_all( add_to_database ); // add_to_database is a Functor that takes a message
        cond_var.wait_for( ... ); // Use a timed wait to avoid missing a signal.  It's OK to consume_all() even if there's nothing in the queue.
    }
}

void Producer( lockfree::queue<uint64_t> &message_queue ) {
    while (!Finished) {
        uint64_t m = receive_from_network( );
        message_queue.push( m );
        cond_var.notify_all( );
    }
}
Dave M.
  • 1,496
  • 1
  • 12
  • 30
0

Assuming that the constraint of using cxx11 is not too hard in your situtation, I would try to use the std::async to make an asynchronous call to the embedded DB.

call me Steve
  • 1,709
  • 3
  • 18
  • 31
  • Note that it is up to the C++ runtime implementation whether `std::async` uses a thread pool or starts a new thread (e.g. see [Which std::async implementations use thread pools?](http://stackoverflow.com/questions/15666443/which-stdasync-implementations-use-thread-pools). When performance is a concern, in the former case you'll want to ensure (in some implementation-dependent way) that the pool size is sufficient. In the latter case you'll want to watch out for thread start up overhead. – rhashimoto Jun 01 '16 at 16:41
  • @rhashimoto I agree with your view too. I like mine as it allows to start with a naive approach that can be later improved sophisticated to answer the problems faced. – call me Steve Jun 02 '16 at 11:36