I have an instrument that produces a stream of data; my code accesses this data though a callback onDataAcquisitionEvent(const InstrumentOutput &data)
. The data processing algorithm is potentially much slower than the rate of data arrival, so I cannot hope to process every single piece of data (and I don't have to), but would like to process as many as possible. Thank of the instrument as an environmental sensor with the rate of data acquisition that I don't control. InstrumentOutput
could for example be a class that contains three simultaneous pressure measurements in different locations.
I also need to keep some short history of data. Assume for example that I can reasonably hope to process a sample of data every 200ms or so. Most of the time I would be happy processing just a single last sample, but occasionally I would need to look at a couple of seconds worth of data that arrived prior to that latest sample, depending on whether abnormal readings are present in the last sample.
The other requirement is to get out of the onDataAcquisitionEvent()
callback as soon as possible, to avoid data loss in the sensor.
Data acquisition library (third party) collects the instrument data on a separate thread.
I thought of the following design; have single producer/single consumer queue and push the data tokens into the synchronized queue in the onDataAcquisitionEvent() callback.
On the receiving end, there is a loop that pops the data from the queue. The loop will almost never sleep because of the high rate of data arrival. On each iteration, the following happens:
- Pop all the available data from the queue,
- The popped data is copied into a circular buffer (I used boost circular buffer), this way some history is always available,
- Process the last element in the buffer (and potentially look at the prior ones),
- Repeat the loop.
Questions:
- Is this design sound, and what are the pitfalls? and
- What could be a better design?
Edit: One problem I thought of is when the size of the circular buffer is not large enough to hold the needed history; currently I simply reallocate the circular buffer, doubling its size. I hope I would only need to do that once or twice.