9

One my thread writes data to circular-buffer and another thread need to process this data ASAP. I was thinking to write such simple spin. Pseudo-code!

    while (true) {
        while (!a[i]) {
            /* do nothing - just keep checking over and over */
        }
        // process b[i]
        i++;
        if (i >= MAX_LENGTH) {
            i = 0;
        }
    }

Above I'm using a to indicate that data stored in b is available for processing. Probaly I should also set thread afinity for such "hot" process. Of course such spin is very expensive in terms of CPU but it's OK for me as my primary requirement is latency.

The question is - am I should really write something like that or boost or stl allows something that:

  1. Easier to use.
  2. Has roughly the same (or even better?) latency at the same time occupying less CPU resources?

I think that my pattern is so general that there should be some good implementation somewhere.

upd It seems my question is still too complicated. Let's just consider the case when i need to write some items to array in arbitrary order and another thread should read them in right order as items are available, how to do that?

upd2

I'm adding test program to demonstrate what and how I want to achive. At least on my machine it happens to work. I'm using rand to show you that I can not use general queue and I need to use array-based structure:

#include "stdafx.h"
#include <string>
#include <boost/thread.hpp>
#include "windows.h" // for Sleep


const int BUFFER_LENGTH = 10;
int buffer[BUFFER_LENGTH];
short flags[BUFFER_LENGTH];

void ProcessorThread() {
    for (int i = 0; i < BUFFER_LENGTH; i++) {
        while (flags[i] == 0);
        printf("item %i received, value = %i\n", i, buffer[i]);
    }
}


int _tmain(int argc, _TCHAR* argv[])
{
    memset(flags, 0, sizeof(flags));
    boost::thread processor = boost::thread(&ProcessorThread);
    for (int i = 0; i < BUFFER_LENGTH * 10; i++) {
        int x = rand() % BUFFER_LENGTH;
        buffer[x] = x;
        flags[x] = 1;
        Sleep(100);
    }
    processor.join();
    return 0;
}

Output:

item 0 received, value = 0
item 1 received, value = 1
item 2 received, value = 2
item 3 received, value = 3
item 4 received, value = 4
item 5 received, value = 5
item 6 received, value = 6
item 7 received, value = 7
item 8 received, value = 8
item 9 received, value = 9

Is my program guaranteed to work? How would you redesign it, probably using some of existent structures from boost/stl instead of array? Is it possible to get rid of "spin" without affecting latency?

Oleg Vazhnev
  • 23,239
  • 54
  • 171
  • 305
  • 4
    Why not just use a blocking bounded queue to exchange data between the threads? Every half-decent concurrency library should have one. (The corollary is: use a half-decent library of concurrency collections and utilities when writing multithreaded code.) – millimoose Apr 25 '13 at 08:26
  • 8
    Are you sure you need threads if you're concerned about latency? – Alexey Frunze Apr 25 '13 at 08:27
  • 2
    How sensitive is the latency - what is your upper limit where your code stops working? How long does it take to process, and how much time do you spend waiting (as a proportion and max/min of "per message")? If you say "don't know", then you need to do some research before even asking this question. – Mats Petersson Apr 25 '13 at 08:27
  • @millimoose because 1. blocking queuies are slloooooow. 2. i already have circular buffer which can be read and there are no reasons to introduce extra `queue`. – Oleg Vazhnev Apr 25 '13 at 08:27
  • @AlexeyFrunze yes I need to return from third-parties callbacks ASAP. so while my thread process available packets from circular buffer original threads decode and place new packets. – Oleg Vazhnev Apr 25 '13 at 08:28
  • @millimoose: Because that would probably incur a much higher latency? @javapowered: You are using `std::atomic` for `i`, right? because otherwise your code would be broken very badly. – Grizzly Apr 25 '13 at 08:28
  • 3
    @javapowered 1. I call bullshit. I doubt a good lock-free queue implementation will be any slower than a correct spinlock. 2. So your argument for reinventing the wheel is that you already reinvented the axle? Anyway. If you want maximum throughput and you're not IO-bounded, don't use threads. If you're doing any IO on your data, the synchronisation overhead will probably be negligible. – millimoose Apr 25 '13 at 08:29
  • @MatsPetersson i just need to do things ASAP. it's trading and sooner you do things more money your earn. even 1 nanosecond is better than 2 nanoseconds. Particular this task i want to do in 1 microsecond or so. – Oleg Vazhnev Apr 25 '13 at 08:31
  • @Grizzly no need to use `std::atomic` for `i` as it used inside one thread only. – Oleg Vazhnev Apr 25 '13 at 08:31
  • @millimoose i have to use threads. i can not do everything i need in third-party CALLBACK! in callback I just decode data and place it in circular buffer. – Oleg Vazhnev Apr 25 '13 at 08:33
  • 1
    @javapowered Do keep in mind that with this solution the **entire** circular buffer will have to be *safely-published* between threads in this case. (I.e. the memory model will have to make sure that all of `b[i]` has been written to memory, not just the pointer change.) I'm not great at these limits of performance and how cache coherency is implemented, but this sounds *really* slow. – millimoose Apr 25 '13 at 08:35
  • What does "process b[i]" actually involve? – Mats Petersson Apr 25 '13 at 08:35
  • 1
    Uh-oh, have you heard that they already use custom-designed hardware for this and run the code pretty much in the network card driver (if not in the network chip!) and it's written in assembly language? ;) – Alexey Frunze Apr 25 '13 at 08:35
  • @MatsPetersson "What does "process b[i]" actually involve" - read data from circular-buffer and place it to shared memory. – Oleg Vazhnev Apr 25 '13 at 08:36
  • @millimoose: General purpose code is typically slower then special purpose code. This is often negliable, but when it comes to locks/atomic ops having some more of those can have a high impact. So calling "bullshit" is quite a bit to harsh. @javapowered: I misunderstood your code in that case and obviously ment to say you use `atomic` for `a[...]`, right?... – Grizzly Apr 25 '13 at 08:36
  • @AlexeyFrunze i heard about that of course, but my question is not about that :) – Oleg Vazhnev Apr 25 '13 at 08:36
  • 1
    I'm mentioning that because it looks like you're already behind with what you're doing and how. – Alexey Frunze Apr 25 '13 at 08:37
  • @Grizzly i'm not sure. why can't i juse regular `bool` for `a[...]`?. Anyway this is pseudo-code. – Oleg Vazhnev Apr 25 '13 at 08:38
  • @AlexeyFrunze i know. – Oleg Vazhnev Apr 25 '13 at 08:39
  • @Grizzly By "I call bullshit" I meant "show me the numbers comparing correct implementations". All this is arguing is pure guesswork, especially seeing as @javapowered seems to not even be aware of the fact this will be hilariously broken without atomic writes or some sort of memory barrier. I'm guessing he'd also need `atomic` writes to the buffer too. Also I'm guessing that with this approach, you could have some contention for the circular buffers. (Emphasis on *guessing*, there's no way to know without benchmarking the hell out of the alternatives, tracing low level thread activity.) – millimoose Apr 25 '13 at 08:39
  • after all now I really think that probaly it would be easier to avoid using extra thread and place data to shared memory directly in callback. well still waiting for suggestion how can i pass data from array-based circular-buffer to another thread with minimal latency. how hard slow and expensive such operation would be.. – Oleg Vazhnev Apr 25 '13 at 08:47
  • 5
    @javapowered "*it's trading and [...] even 1 nanosecond is better than 2 nanoseconds*" You **do** realize that to be of any use, your application needs to communicate with the real world and that *any* I/O will completely drown your (premature) optimization because of the tens or hundreds of milliseconds the I/O takes, right? – syam Apr 25 '13 at 08:48
  • Assumg `b[i]` being copied to shared memory involves less than many kilobytes, I'd say "not using threads" would be the fastest method. – Mats Petersson Apr 25 '13 at 08:48
  • 1
    @Grizzly Basically, a specialized implementation should aim to fix *known* problems with a general one. Right now **nothing** is known about the performance characteristics of either implementation, except that the one outlined above is probably broken since it doesn't synchronise reads and writes to the circular buffer. (@javapowered: Which you **have** to do. "Memory is shared between threads" is an illusion that has some holes in it, especially in the face of multiple cores and CPU optimizations. A side effect of using existing synchronisation primitives is that they close these holes.) – millimoose Apr 25 '13 at 08:48
  • @Grizzly there are no need to synchronize read and writes as if `a[i]` i can only read and if `!a[i]` i can only write. btw probably code above should set `a[i]` to false once data is read. – Oleg Vazhnev Apr 25 '13 at 08:51
  • 2
    @javapowered AFAIK, without any synchronisation, what could happen is that when you create a new object, and write a pointer to it to `a[i]`, the compiler / CPU can reorder instructions in such a way that the pointer is written first, and the memory it points to is "filled in" later. Now, imagine what happens when the threads are switched between these two events. Suddenly your reading thread will see that `a[i]` isn't `NULL`, and go and process utter garbage. – millimoose Apr 25 '13 at 08:53
  • @millimoose that's why i've asked this question. i'm looking for some implementation that do what i want to do but not so buggy as my :) – Oleg Vazhnev Apr 25 '13 at 08:54
  • 2
    @javapowered Like, say, a blocking bounded queue from a known and tested concurrency library? My larger point is that low-level concurrency is **hard**, I've probably omitted many important concepts in what I already said. If you want to push the performance of this to the edge, you should sit down with an actual book on C++ concurrency, learn the C++ memory model in-depth, write several alternative implementations after you know how to make them correct, and compare them. – millimoose Apr 25 '13 at 08:54
  • @millimoose i can' use queue as packets that I receive are unordered and I need to restore the order. And I receive duplicate packets in two threads moreover. you can check my implementation here http://stackoverflow.com/questions/16202472/avoiding-collisions-when-collapsing-infinity-lock-free-buffer-to-circular-buffer i'm tryint to impelemnt `Process` method in this question. – Oleg Vazhnev Apr 25 '13 at 08:58
  • my full path looks as folow: 1. receive UDP datagram 2. drop it if already received, decode otherwise to my own structures. 3. restore order, map my own strucutes to shared memory. 4. go to 1. and receive next UDP datagram. While i spent time in network driver callback - it queiries new datagrams. So i've decided that between 2 and 3 is a good place to move work to another "working" thread as this is the place where i merge datagrams from two sources into one. – Oleg Vazhnev Apr 25 '13 at 09:11
  • @javapowered: No that won't do it. You need atomics in order to prevent reordering. Another point: the way I understand your code (You get your objects one at a time) you almost certainly might want to make sure the alignment of the elements of `a` and `b` is a multiple of the cachelinesize to avoid false sharing – Grizzly Apr 25 '13 at 09:13
  • "create a new object, and write a pointer to it to a[i]" - a actually store always the same object inside a[i]. I only reconfigure it every time. object itself contains only primitive types so I hope that compiler/CPU will not reorder that. – Oleg Vazhnev Apr 25 '13 at 11:22
  • 1
    @javapowered I misunderstood your `while (!a[i])` construct though. Do you have anything you're basing your hope on? Why *wouldn't* the optimiser reorder those field writes if there's no dependency between them to constrain the order? And if you reconfigure the object it's **even worse**. Threads could switch between writes to two fields of the same object, that's *glaringly* broken. (That's assuming the change will be propagated between threads at all, since you're not using a memory barrier of any kind.) – millimoose Apr 25 '13 at 11:34
  • 1
    Side note: don't busy wait, at least use a small sleep. – Shahbaz Apr 25 '13 at 11:44
  • @Shahbaz no small sleep! i'm writing HFT trading software! – Oleg Vazhnev Apr 25 '13 at 11:45
  • @millimoose you probably right. i think my question need to be simplified (i've added `upd` to description) i think i will write small test program to 1. demonstrate what I want to achive 2. check if things work – Oleg Vazhnev Apr 25 '13 at 11:47
  • @javapowered There's a Herb Sutter talk on Channel9 that I think goes over the nitty gritty of C++ concurrency, I'd start there: http://channel9.msdn.com/Shows/Going+Deep/Cpp-and-Beyond-2012-Herb-Sutter-atomic-Weapons-1-of-2. I *think* the basic idea is "use some form of memory barrier". I believe atomics, mutexes+semaphores, correctly implemented concurrent collections, etc. already take care of setting those up. The "checking if things work" is a problem because multithreading bugs are inherently nondeterministic, so "follow best practice" is the only sane way of preventing them. – millimoose Apr 25 '13 at 12:03
  • @millimoose thanks for reference. i'm watching presentation. i think i will attach my test program after several hours. – Oleg Vazhnev Apr 25 '13 at 12:06
  • @javapowered The wiki article on memory barriers also explains what can happen when you're not sharing data safely: http://en.wikipedia.org/wiki/Memory_barrier Another important concept is the "memory model", although there you'll probably have to look for C++11 specific explanations, since it's not a standardised thing. – millimoose Apr 25 '13 at 12:11
  • @millimoose i've added my implementation. so far it works fine. I'm not sure if I should use std::atomic for both `buffer` and `flags`. Probaly somone can explain. Also I'm looking for some library that allows me to do things like that using API. – Oleg Vazhnev Apr 25 '13 at 16:03
  • @javapowered "I'm not sure if I should use `std::atomic` for both `buffer` and `flags`." **For the love of god, yes.** It's the main thing me and Grizzly have been telling you. You **simply cannot** share memory between threads without using *any* synchronization mechanism that would impose some sort of memory access ordering. The behaviour of your code is *undefined* in the face of optimisations and instruction reordering. Without memory barriers, you are only guaranteed that a memory location is written before it's read in a single thread. – millimoose Apr 25 '13 at 16:22
  • @millimoose: Actually with the correct memory order semantics using atomic only for the flag should be fine. The Release semantics on the store to `flags` would make sure that the write to `buffer` has completed beforehand, while the Acquire on the load would ensure that the load from `buffer` is not reordered before the load from `flags` making sure that the right value is seen. Of course I would doublecheck this before I would call that certain. @javapowered: As you see programming with atomics is quite tricky, so ensure that you know what you are doing. – Grizzly Apr 25 '13 at 16:32
  • With the correct atomics that design should be ok and probably the lowest latency solution you can get aside from the cache line effects I mentioned earlier (if several threads operate on elements from `buffer` and at least one is doing writes, you will be constantly transfering cachelines, thus reducing overall performance, so depending on how often you get updates, you might want to ensure every element is in its own cacheline) – Grizzly Apr 25 '13 at 16:34
  • @Grizzly I'll admit things the low-level details of this are lost on me. But the OP isn't using `atomic` or anything *anywhere*, and that's certain to be wrong. My expletive was meant to say "do something except what you're doing now which is nothing". And, going by your comment, microoptimising at this level requires knowing the memory model in detail and being able to justify decisions like forgoing one side of a "data exchange". – millimoose Apr 25 '13 at 16:37
  • @millimoose: I certainly won't dispute that. When writing such low level thread communication there are IMO only two ways to get it right: Either you know exactly what you are doing or you farm out that expertise by letting someone who does know what he/she is doing verify your code. javapowered: If you use Boost.Thread for your threads, why would you use the windows sleep function over the one in boost? – Grizzly Apr 25 '13 at 16:49
  • @Grizzly i've tried to find boost sleep thread but not succeed. I really don't want to use such low-level techiniqies in my example as using std:atomic for `flags`. Because it's too hard to understand exactly how things works. Probably you can suggest array or circular-buffer implementations that I can use instead from boost or other libriries? – Oleg Vazhnev Apr 25 '13 at 17:59
  • well probably I should just use disruptor? not sure however if c++ port of this project is good and alive... – Oleg Vazhnev Apr 25 '13 at 18:18
  • 1
    Might I add that if nano-seconds matter, I hope that you are directly, locally connected to the stock exchange's main servers? No matter how good your software is, if someone else is closer and running on lower level / lower latency hardware, you aren't going to win in a speed trading war. Interesting read thus far though and I've learned a few things myself about concurrency. – Michael Dorgan Apr 25 '13 at 18:46
  • 1
    @javapowered: You aren't allowed to say "I'm writing HFT software" if you haven't got anything that resembles it. What you're doing is prototyping, and you're doing it poorly. You're never going to succeed if you keep pulling random "I hope this" and "I require that" statements from thin air without a shred of data. Stop being lazy, write all the potential implementations, time them, compare them improve them. Do your research. – GManNickG Apr 25 '13 at 19:00
  • @javapowered [Boost.Lockfree](http://www.boost.org/doc/libs/1_53_0/doc/html/lockfree.html) has [`spsc_queue`](http://www.boost.org/doc/libs/1_53_0/doc/html/boost/lockfree/spsc_queue.html) that's implemented as a ringbuffer, and is wait-free. This means that you can busy-wait until either there's free space in the producer, or items in the queue in the consumer. (Whether this is a good idea I don't know, but it's the closest to what you're doing now.) Alternately, the regular lockfree queue class *seems* to support blocking operations as well as a bounded capacity. – millimoose Apr 25 '13 at 19:02
  • Also, a spin lock *may* worsen your latency because you're burning cycles on doing nothing, which could be used to do something else that's useful. Note I said "may": it depends on what you're doing and only you can figure out which is best. And you're going to do it by trying them out and comparing *with numbers*. Sometimes a spin lock is the right solution. Usually it isn't. – GManNickG Apr 25 '13 at 19:02
  • @MichaelDorgan yes i do collocate in the same building as stock exchange. also you can treat my question as academic, just concetrate on the subject instead of trying to find where else I might be wrong :) – Oleg Vazhnev Apr 25 '13 at 19:14
  • @millimoose thanks for reference. what I actually need is "two-writer one-reader array-based buffer" (as I receive packets in wrong order). Or at least I need "two-writers one-reader queue"... Probaly I should use disruptor. – Oleg Vazhnev Apr 25 '13 at 19:19
  • 2
    @javapowered The regular `queue` will accept however many producers / consumers you throw at it. And according to the documentation, if you use the `fixed_size` policy (whatever those are in Boost) it will be bounded and backed by an array. This service provided by Reading The Documentation For You, Inc. – millimoose Apr 25 '13 at 20:11

5 Answers5

3

This is what Condition Variables were designed for. std::condition_variable is defined in the C++11 standard library.

What exactly is fastest for your purposes depends on your problem; You can attack it from several angles, but CVs (or derivative implementations) are a good starting point for understanding the subject better and approaching an implementation.

justin
  • 104,054
  • 14
  • 179
  • 226
  • 1
    It is however not exactly a low latency operation, which means that this doesn't really answer the question – Grizzly Apr 25 '13 at 16:35
  • Condition variables are, essentially, locks. The OP's problem only requires locking when the buffer is empty or full, it should be possible to use lockfree approaches the rest of the time. – millimoose Apr 25 '13 at 16:39
  • @Grizzly if what you say is true… surely you can provide an answer that the OP can easily understand and implement, which also guarantees lower latency than CVs. so, how do *you* eliminate that spinning *and* achieve lower latency than CV while using two threads? – justin Apr 25 '13 at 18:56
  • @millimoose and locks can be implemented using atomic ops on some platforms. locks are not always bad. if used correctly, it can be very very fast. sure, more specific implementations can be dreamed up and tested for a specific purpose, but that wouldn't make much sense to describe before the OP understands topics like CVs and sharing data using this approach. plus, there really isn't enough information in the OP to give a very specific solution. get it correct before trying to make it faster ;) – justin Apr 25 '13 at 19:03
  • @justin Oh, I was coming from the correctness angle. You should (b)lock when you need to (b)lock, and going by the OP's description he certainly does when the buffer is full or exhausted. Except that's exactly what a bounded producer-consumer queue will do, correctly - it fits the desired semantics precisely, and is arguably the go-to solution to a producer-consumer scenario. The fact that lock-free collection implementations are widely available is, I hope, just a performance bonus, or at least no worse than locking around a primitive collection. – millimoose Apr 25 '13 at 19:20
3

If the consuming thread is put to sleep it takes a few microseconds for it to wake up. This is the process scheduler latency you cannot avoid unless the thread is busy-spinning as you do. The thread also needs to be real-time FIFO so that it is never put to sleep when it is ready to run but exhausted its time quantum.

So, there is no alternative that could match latency of busy spinning.

(Surprising you are using Windows, it is best avoided if you are serious about HFT).

Maxim Egorushkin
  • 131,725
  • 17
  • 180
  • 271
  • i'm in the process of transferring to Windows. need to get rid of C# and then I ready. i'm still looking for something ready to use but it seems there are nothing except disruptor-cpp... – Oleg Vazhnev Apr 25 '13 at 21:07
1

Consider using C++11 library if your compiler supports it. Or boost analog if not. And in your case especially std::future with std::promise.

There is a good book about threading and C++11 threading library:

Anthony Williams. C++ Concurrency in Action (2012)

Example from cppreference.com:

#include <iostream>
#include <future>
#include <thread>

int main()
{
// future from a packaged_task
std::packaged_task<int()> task([](){ return 7; }); // wrap the function
std::future<int> f1 = task.get_future();  // get a future
std::thread(std::move(task)).detach(); // launch on a thread

// future from an async()
std::future<int> f2 = std::async(std::launch::async, [](){ return 8; });

// future from a promise
std::promise<int> p;
std::future<int> f3 = p.get_future();
std::thread( [](std::promise<int>& p){ p.set_value(9); }, 
             std::ref(p) ).detach();

std::cout << "Waiting..." << std::flush;
f1.wait();
f2.wait();
f3.wait();
std::cout << "Done!\nResults are: "
          << f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';

}

Ivan Aksamentov - Drop
  • 12,860
  • 3
  • 34
  • 61
  • 1
    I love futures to death but they're a high-level construct that's not even close to low-overhead. – millimoose Apr 25 '13 at 12:00
  • 1
    and the relevance of this is? – Nim Apr 25 '13 at 12:02
  • 2
    Also I doubt they're an appropriate mechanism here. Futures are for when the consumer pulls data from a long asynchronous producing task. Here the producer is continuously pushing data to a consumer thread. So, yeah, this answer is barely on point. – millimoose Apr 25 '13 at 12:27
0

If you want a fast method then simply drop to making OS calls. Any C++ library wrapping them is going to be slower.

e.g. On Windows your consumer can call WaitForSingleObject(), and your data-producing thread can wake the consumer using SetEvent(). http://msdn.microsoft.com/en-us/library/windows/desktop/ms687032(v=vs.85).aspx

For Unix, here is a similar question with answers: Windows Event implementation in Linux using conditional variables?

Community
  • 1
  • 1
Graham Perks
  • 23,007
  • 8
  • 61
  • 83
  • a think any synchronization will cost me about 5-15 microseconds. intead I likely should spent one CPU core but have latency of less than 1 microsecond... – Oleg Vazhnev Apr 25 '13 at 13:42
  • 2
    @javapowered: Where are you pulling those numbers from? Why 1 microsecond? Why such round even numbers? Sounds like you're just making stuff up. – GManNickG Apr 25 '13 at 18:53
  • 10-15 microseconds is what i generally spent when using BlockingCollection in C#. I guess that any other "fair lock" approach will cost something like that. – Oleg Vazhnev Apr 25 '13 at 19:10
  • 1
    Consider that if you're hammering one CPU core, the CPU may throttle everything back to keep cool. That may hurt your overall timing. – Graham Perks Apr 26 '13 at 01:24
0

Do you really need threading?

A single threaded app is trivially simple and eliminates all the issues with thread safety and the overhead of launching threads. I did a study of threaded vs non threaded code to append text to a log file. The non threaded code was better in every measure of performance.

Jay
  • 13,803
  • 4
  • 42
  • 69
  • by definition i have two threads already because I have to receive the same data from two UDP multicast sockets. – Oleg Vazhnev Apr 25 '13 at 21:31
  • I guess I misunderstand this comment. Why does receiving from two sockets require two threads? – Jay Apr 26 '13 at 04:50
  • good question. and the answer i think because you CAN. I think you have to do that in parallel. assume on one socket you receive packet 1 and on another socket you receive packet 2. you can now in parallel parse and decode both packets so overall latency is as low as possible. if you using just one thread packet 2 have to wait until packet 1 is processed. – Oleg Vazhnev Apr 26 '13 at 06:08