-1

I am looking for advice how to best architecture a buffer structure that can handle a massive amount of incoming data that are processed at a slower speed than the incoming data.

I programmed a customized binary reader that can stream up to 12 million byte arrays per second on a single thread and look to process the byte array stream in a separate structure on the same machine and different thread. The problem is that the consuming structure cannot keep up with the amount of incoming data of the producer and thus I believe I need some sort of buffer to handle this properly. I am most interested in advice regarding the overall architecture rather than code examples. I target .Net 4.0. Here is more information of my current setup and requirements.

Producer: Runs on a dedicated thread and reads byte arrays from files on physical storage medium (SSD, OCZ Vertex 3 Max IOPS). Approximate throughput is 12 million byte arrays per second. Each array is only of 16 byte size. Fully implemented

Consumer: Supposed to run on a separate thread than the producer.Consumes byte arrays but must parse to several primitive data types before processing the data, thus the processing speed is significantly slower than the producer publishing speed. Consumer structure is fully implemented.

In between: Looking to set up a buffered structure that provides the producer to publish to and the consumer to, well, consume from. Not implemented.

I would be happy if some of you could comment from your own experience or expertise what best to consider in order to handle such structure. Should the buffer implement a throttling algorithm that only requests new data from the producer when the buffer/queue is half empty or so? How is locking and blocking handled? I am sorry I have very limited experience in this space and have so far handled it through the implementation of a messaging bus but any messaging bus technology I looked at is definitely unable to handle the throughput I am looking for. Any comments very welcome!!!

Edit: Forgot to mention, the data is only consumed by one single consumer. Also the order in which the arrays are published does matter; the order needs to be preserved such that the consumer consumes in the same order.

Matt
  • 7,004
  • 11
  • 71
  • 117
  • If you can only have one consumer for producer, and your consumer is going to be slower than your producer, how are you going to avoid being perpetually behind, forever? Is there no way to process chunks of this data in parallel and then join the results in some way? – Joe Mar 03 '12 at 12:51
  • It won't because the amount of data that is published is limited. Its a certain amount of data that is pushed and thats it. The goal of the exercise was never to speed up the consumer. I need guaranteed delivery to the consumer and I need to manage memory (so, I can't just load the full data set into memory), and I need the hand off from producer to consider to be as fast as possible. The consumer processes the data at a given speed which can't be further optimized. – Matt Mar 03 '12 at 14:02

4 Answers4

1

You can use a BlockingCollection it will block the producer from adding items to the collection as long as the consumer hasn't consumed enough items.

There are other concurrent collection classes as well, eg. ConcurrentQueue

Community
  • 1
  • 1
thumbmunkeys
  • 20,606
  • 8
  • 62
  • 110
  • Thanks for the pointers. So blocking refers to the publisher of a collection? What I try to avoid is that producer and consumer cannot concurrently access the collection. – Matt Mar 03 '12 at 12:10
  • they can access the collection concurrently, thats the point of this classes. And you can define how many items can be in the collection, if the collection is full, the producer is blocked. – thumbmunkeys Mar 03 '12 at 12:12
  • would you know whether the decision algorithm of the un-blocking can be customized? For example, can I structure it in a way to allow un-blocking when the collection is half-empty rather than have it un-block already when only one data item is consumed/removed from the collection and thus the collection is essential still almost full? – Matt Mar 03 '12 at 12:15
  • the collection never block as soon as it is not full. – thumbmunkeys Mar 03 '12 at 12:24
  • hmm thats I guess not what I want because it would really allocate a lot of precious resources to the whole threading and blocking/unblocking mechanisms. Martin recommended considering chunks of such small 16 byte arrays but I still look for a solution with more customization capabilities, a hybrid between what you described and a solution written from scratch. – Matt Mar 03 '12 at 12:40
  • you need some synchronisation between producer and consumer. I doubt you will find a non blocking solution unless you have infinite resources... – thumbmunkeys Mar 03 '12 at 12:43
  • Yes, I figured such. I just try to assess whether I best write something from scratch or consider certain blocking collections as recommended. It gets even trickier because I still consider to potentially have a messaging bus in between. – Matt Mar 03 '12 at 12:49
1

16 bytes, (call it 16B), is too small for efficient inter-thread comms. Queueing up such small buffers will result in more CPU spent on inter-thread comms than on actual useful processing of the data.

So, chunk them up.

Declare some buffer class, (C16B, say), that contains a nice, big array of these 16B's - at least 4K's worth, and a 'count' int to show how many are loaded, (the last buffer loaed from a file will probably not be full). It will help if you place a cache-line-sized empty byte array just in front of this 16B array - helps to avoid false-sharing, You can maybe put the code that processes the 16B's in as a method, 'Process16B', sya, and perhaps the code that loads the array too - takes a file descriptor as a parameter. This class can now be efficiently loaded up an queued to other threads.

You need a producer-consumer queue class - C# already has one in the BlockingCollection classes.

You need flow-control in this app. I would do it by creating a pool of C16B's - create a blocking queue and create/add a big pile of C16B's in a loop. 1024 is a nice, round number. Now you have a 'pool queue' that provides flow-control, avoids the need to new() any C16B's and you don't need them to be continually garbage-collected.

Once you have this, the rest is easy. In your loader thread, continually dequeue C16B's from the pool queue, load them up with data from the files and add() them off to the processing threads/s on a '16Bprocess' blocking queue. In the processing threads, take() from the 16Bprocess queue and process each C16B instance by calling its Process16B method. When the 16B's are processed, add() the C16B back to the pool queue for re-use.

The recycling of the C16B's via the pool queue provides end-to-end flow-control. If the producer is the fastest link, the pool will eventually empty and the producer will block there until the consumer/s returns some C16B's.

If the processing takes so much time, you could always add another processing thread if you have spare cores available. The snag is with such schemes is that the data will get processed out-of-order. This may, or may not, matter. If it does, the data flow might need 'straightening out' later, eg. using sequence numbers and a buffer-list.

I would advise dumping the pool queue count, (and maybe the 16Bprocess queue count as well), to a status component or command-line with a timer. This provides a useful snapshot of where the C16B instances are and you can see the bottlenecks and any C16B leaks without 3rd-party tools, (the ones that that slow the whole app down to a crawl and issue spurious leak reports on shutdown).

Martin James
  • 24,453
  • 3
  • 36
  • 60
  • Martin, really appreciate your comments. Let me digest and I promise to get back with thoughts/results. Forgot to mention that the order in which the producer published matters and that it has to be consumed in the same order. But I find your comments re chunking up the 16B arrays quite interesting. It got me started to think that I may even try to publish such chunks onto my RabbitMQ bus. I currently max out at around 40,000 messages per second but if I publish chunks instead of individual 16B messages then it may work. I would still need to think about how to potentially block the producer. – Matt Mar 03 '12 at 12:34
  • Martin, thanks I think your answer comes closest to what I was seeking. I actually ended up going the RabbitMQ route but still the chunking idea helped a great deal and I also had to implement a blocking algorithm anyway unless I guarantee message delivery (concurrency) through RabbitMq which would slow down message transfer by several degrees. – Matt Mar 05 '12 at 15:38
0

IMO a blocking Queue of some kind may solve your problem. Essentially the Producer thread will block if the queue has no more capacity. Look at this Creating a blocking Queue<T> in .NET?

Community
  • 1
  • 1
ajay
  • 651
  • 2
  • 8
  • 16
  • Would I not want to look at non-blocking algorithms? I do not want the queue to be locked just because the producer publishes new data onto the queue/collection while the consumer is prevented from doing its work. Or am I misunderstanding the concept of non-blocking? But I will read up on blocking queues, thanks for the suggestion. – Matt Mar 03 '12 at 12:05
  • If you design your buffer class properly, the queue will only be locked for the very short time taken to push a buffer reference on. The trick is to ensure that each buffer class contains a lot of data so that queue operations are minimized. Problems arise when developers use wide queues and keep the queue locked for ages while they copy in/out KB of data. If you queue up only reference types, there should be no problem. – Martin James Mar 03 '12 at 12:29
0

Why bother with a buffer at all? Use the disk files as a buffer. When the consumer starts processing a byte array, have the reader read the next one and that's it.

EDIT: After asking for decoupling of the consumer and producer.

You can have a coordinator that tells the producer to produce X byte arrays, and supplies X byte arrays to the consumer. The three parts can act like this:

Coordinator tells producer to produce X byte arrays. Producer produces X byte arrays

And now do this in a loop: Coordinator tells consumer to consumer X byte arrays Coordinator tells producer to produce X byte arrays Consumer tells coordinator it's done consuming Loop until there are no more byte arrays

The producer and coordinator can run in the same thread. The consumer should have its own thread.

You will have almost no locking (I think you can do this with no locking at all, just a single wait handle the consumer uses to notify the coordinator it's done), and your coordinator is extremely simple.

REEDIT: Another really decoupled option

Use ZeroMQ for handling the communications. The producer reads byte arrays and posts each array to a ZeroMQ socket. The consumer reads arrays from a ZeroMQ socket.

ZeroMQ is very efficient and fast, and handles all the technicalities (thread synchronization, buffering, etc...) internally. When used on the same computer, you won't suffer any data loss, too (which might happen when using UDP on two different machines).

zmbq
  • 38,013
  • 14
  • 101
  • 171
  • Because I want to implement the overall structure and model it after a finite state machine. The producer and consumer should know nothing about each other. In thus your recommendation does not fit my purpose. – Matt Mar 03 '12 at 12:01
  • I added an example of a very simple architecture for the problem at hand. Two threads, no locking. – zmbq Mar 03 '12 at 15:20
  • you suggested basically the same thing that you came up with first. Your proposed solution does not decouple producer and consumer. In effect it would break the fundamental concept of a finite state machine. To play to your example, the coordinator should never have to tell the consumer what to do, nor should the consumer tell the producer that its done. There is a buffer and all the consumer does is checking whether there is data to be processed. If yes then process it. Same on the producer side. If the buffer is not full anymore then publish new data onto the buffer. – Matt Mar 03 '12 at 15:54
  • I'm not sure I follow. The consumer and producer don't know each other at all, they both are just aware of the coordinator. The coordinator is really a very simple buffer mechanism, because we leave most of the buffering to the file system. – zmbq Mar 03 '12 at 17:50
  • Hi, I tried through RabbitMQ, but even when I run the RabbitMQ server on my local machine it gets to process a maximum of about 45mb/second which is awfully slow for my purposes. Any idea how to improve on those numbers? – Matt Mar 05 '12 at 15:29
  • Is your problem I/O or CPU? As long as you have a single consumer, you're stuck. Try using ZeroMQ which is very efficient. If it doesn't help, you'll need several consumers in parallel. – zmbq Mar 05 '12 at 15:50