20

My app reads bytes from a TCP socket and needs to buffer them up, so that I can extract messages from them later. Due to the nature of TCP I may get partial or multiple messages in one read, so after each read I would like to inspect the buffer and extract as many full messages as are available.

Therefore I want a class that allows me to do the following:

  • append arbitrary byte[] data to it
  • inspect the content without consuming it, in particular checking the amount of content and also searching for the existence of a certain byte or bytes
  • extract and consume part of the data as a byte[], while leaving the rest in there for a future read

I expect that what I want can be done with 1 or more existing classes in the .NET library, but I'm not sure which ones. System.IO.MemoryStream looks close to what I want, but (a) it isn't clear whether it's suited to being used as a buffer (does the read data get removed from the capacity?) and (b) reads and writes seem to happen at the same place - "The current position of a stream is the position at which the next read or write operation could take place." - which is not what I want. I need to be writing to the end and reading from the front.

Kylotan
  • 18,290
  • 7
  • 46
  • 74
  • you want to append arbitrary byte arrays? or arbitrary bytes? regardless, maybe List or List will work for you – Jason Aug 19 '11 at 14:27
  • byte arrays, but which are merged into a contiguous array of bytes. I don't think List will be efficient enough for this application. – Kylotan Aug 19 '11 at 14:33
  • i didn't know there are performance requirements, jon's suggestion looks like a good one – Jason Aug 19 '11 at 14:34
  • Hmm, List might actually be fine, having checked the implementation, but the interface is a bit cumbersome. – Kylotan Aug 19 '11 at 14:39
  • What about bufferedstream ? – KRoy Jan 11 '18 at 19:17

8 Answers8

13

I suggest you use MemoryStream under the hood, but encapsulate it in another class which stores:

  • The MemoryStream
  • The current "read" position
  • The current "consumed" position

It will then expose:

  • Write: set the stream's position to the end, write data, set the stream's position back to the read position
  • Read: read data, set the read position to the stream's position
  • Consume: update the consumed position (details based on how you're trying to consume); if the consume position is above a certain threshold, copy the existing buffered data into a new MemoryStream and update all the variables. (You probably don't want to copy the buffer on every consume request.)

Note that none of this will be thread-safe without extra synchronization.

Jon Skeet
  • 1,421,763
  • 867
  • 9,128
  • 9,194
  • 1
    What is the advantage of using a `MemoryStream` under the hood, compared to a plain `byte[]` array? (ok, never mind, got it: it's resized automatically) – vgru Aug 19 '11 at 14:33
  • 1
    @Groo: The `MemoryStream` has an expandable capacity (if needed). – Oliver Aug 19 '11 at 14:38
  • What is a good method to copy the remainder of the existing MemoryStream to a new one? – Kylotan Aug 19 '11 at 15:02
  • @Kylotan: The *fastest* way is probably to call `MemoryStream.GetBuffer` to get the underlying buffer, then write a portion to the new stream using a normal `Write` call. – Jon Skeet Aug 19 '11 at 15:05
  • I accepted the answer below with example code posted, but thanks for this answer too, which appears equally correct. – Kylotan Aug 20 '11 at 14:11
  • Having done this once, I can't say I'd recommend it. In retrospect I realized I'd taken data out of a perfectly good buffer (NetworkStream) into in another buffer (my ConcurrentMemoryStream) which I had to write, test, debug etc... and still hadn't really made life much easier for the client. – piers7 Mar 25 '15 at 09:25
  • What is the difference between consumed and read position ? I did not find any consumed marker in MemoryStream. It has Begin,Current, End marker. – KRoy Jan 11 '18 at 19:16
  • @piers7: They're not part of MemoryStream. They're part of a suggested wrapper around that, for the purposes of the question. – Jon Skeet Jan 11 '18 at 19:18
  • What about Blocking Collection. In this topic it cannot do without threads and they are thread save, can simulate stack or queue. Waiting for response – Indigo_heart Dec 07 '20 at 20:32
  • What disadvantages are if instead using MemoryStream I will use List ? – Indigo_heart Dec 07 '20 at 20:39
  • @Indigo_heart: I suspect it would be more complex code, basically. – Jon Skeet Dec 08 '20 at 06:40
12

Just use a big byte-array and Array.Copy - it should do the trick. If not, use List<byte>.

If you use the array you have to implement an index to it (where you copy additional data) yourself (same for checking the content-size), but it's straightforward.

If you are interested: here is a simple implementation of a "cyclic buffer". The test should run (I threw a couple unit test at it, but it didn't check all critical path):

public class ReadWriteBuffer
{
    private readonly byte[] _buffer;
    private int _startIndex, _endIndex;

    public ReadWriteBuffer(int capacity)
    {
        _buffer = new byte[capacity];
    }

    public int Count
    {
        get
        {
            if (_endIndex > _startIndex)
                return _endIndex - _startIndex;
            if (_endIndex < _startIndex)
                return (_buffer.Length - _startIndex) + _endIndex;
            return 0;
        }
    }

    public void Write(byte[] data)
    {
        if (Count + data.Length > _buffer.Length)
            throw new Exception("buffer overflow");
        if (_endIndex + data.Length >= _buffer.Length)
        {
            var endLen = _buffer.Length - _endIndex;
            var remainingLen = data.Length - endLen;

            Array.Copy(data, 0, _buffer, _endIndex, endLen);
            Array.Copy(data, endLen, _buffer, 0, remainingLen);
            _endIndex = remainingLen;
        }
        else
        {
            Array.Copy(data, 0, _buffer, _endIndex, data.Length);
            _endIndex += data.Length;
        }
    }

    public byte[] Read(int len, bool keepData = false)
    {
        if (len > Count)
            throw new Exception("not enough data in buffer");
        var result = new byte[len];
        if (_startIndex + len < _buffer.Length)
        {
            Array.Copy(_buffer, _startIndex, result, 0, len);
            if (!keepData)
                _startIndex += len;
            return result;
        }
        else
        {
            var endLen = _buffer.Length - _startIndex;
            var remainingLen = len - endLen;
            Array.Copy(_buffer, _startIndex, result, 0, endLen);
            Array.Copy(_buffer, 0, result, endLen, remainingLen);
            if (!keepData)
                _startIndex = remainingLen;
            return result;
        }
    }

    public byte this[int index]
    {
        get
        {
            if (index >= Count)
                throw new ArgumentOutOfRangeException();
            return _buffer[(_startIndex + index) % _buffer.Length];
        }
    }

    public IEnumerable<byte> Bytes
    {
        get
        {
            for (var i = 0; i < Count; i++)
                yield return _buffer[(_startIndex + i) % _buffer.Length];
        }
    }
}

Please note: the code "consumes" on read - if you don't want that just remove the "_startIndex = ..." parts (or make a overload optional parameter and check or whatever).

Will Eddins
  • 13,628
  • 5
  • 51
  • 85
Random Dev
  • 51,810
  • 9
  • 92
  • 119
  • Looks pretty good, with the one downside being that I could really do with a contiguous view of the whole buffer when deciding whether to call Read() or not. Is there a good way of implementing that without a copy? Maybe with an IEnumerable that iterates over both parts? – Kylotan Aug 19 '11 at 15:37
  • I updated the code so that you can opt to keep the data on a read - is this ok? (you can preview all the data with obj.Read(obj.Count, true) or more readable obj.Read(obj.Count, keepData: true) – Random Dev Aug 19 '11 at 17:12
  • Yeah, I was aware I could do that, but I'd like to be able to avoid those copies each time. That's a lot of extra allocation and deallocation I could do without. (I'm a game programmer and this will get called a lot.) – Kylotan Aug 19 '11 at 19:16
  • oh - the Array.Copy ... sorry. I add an indexer and an IEnumerable ... done – Random Dev Aug 19 '11 at 20:38
  • I'm probably not going to be able to use this because calling that function for every byte is going to be far too slow. Not sure what I'll do instead though! – Kylotan Aug 21 '11 at 16:55
  • Two suggestions to improve performance (I remember back when I did game development - you'll sell your family to get 60 fps). First, if the buffer will often be empty, then on a read, if it's empty, reset _endIndex to 0. Why? Because second add a read call that returns an array/offset/length. If it's continuous it's the internal one, if not then you have the create/copy. Then have a call to release what was read once you've used the bytes. – David Thielen Sep 25 '11 at 17:15
  • In the Write method, shouldn't the value of endLen be calculated using _buffer.Length - _endIndex, not _buffer.Length - data.Length? – Kyle Gagnet Feb 17 '12 at 04:09
  • yes you are right - sorry. Obviously I had not all cases covered :( – Random Dev May 26 '12 at 06:32
  • Rather than throwing an exception on read I would simply return what is available (len = Count). On Write I would choose an OverflowException but that's just me. – Lord of Scripts Aug 30 '13 at 20:20
  • as you said: whatever you is ok with you - but I would call his behaviour TryRead or ReadAvaiable or whatever – Random Dev Sep 10 '13 at 12:30
3

I think BufferedStream is the solution to the problem. Also it is possible to go unread len byte of data by calling Seek.

BufferdStream buffer = new BufferedStream(tcpStream, size); // we have a buffer of size
...
...
while(...)
{
    buffer.Read(...);
    // do my staff
    // I read too much, I want to put back len bytes
    buffer.Seek(-len, SeekOrigin.End);
    // I shall come back and read later
}

Growing memory

Contrary to BufferedStream where the size is specified initially, MemoryStream can grow.

Remembering streaming data

MemoryStream holds all the dara-read all the time, while BufferedStream only holds a segment of stream data.

Source stream vs byte array

MemoryStream allows to add input-bytes in Write() method which can be Read() in future. While BufferedSteam takes input-bytes from another source-stream specified in the constructor.

KRoy
  • 1,290
  • 14
  • 10
1

Here's another implementation of a buffer I wrote a while ago:

  • Resizeable: allowing to queue up data and not throw buffer overflow exception;
  • Efficient: uses a single buffer and Buffer.Copy operations to enqueue/dequeue data
Kel
  • 1,217
  • 11
  • 21
  • That would be great for my purposes if it allowed arbitrary-length peek operations. Otherwise it can be a bit tricky to know whether I can safely consume the data or not. – Kylotan Apr 18 '14 at 14:34
1

Coming to this late, but for posterity:

When I've done this in the past I've take a slight different approach. If your messages have a fixed header size (that tells you how many bytes in the body), and bearing in mind the network stream is already buffering, I perform the operation in two phases:

  • a read on the stream for the bytes for the header
  • a subsequent read on the stream for the bytes for the body, based on the header
  • repeat

This leverages the fact that - for a stream - when you ask for 'n' bytes you'll never get more back, so you can ignore many of the 'opps I read too many, let me put these aside till next time' issues.

Now this isn't the whole story, to be fair. I had an underlying wrapper class over the stream to handle fragmentation issues (ie if asked for 4 bytes, don't return until 4 bytes received, or stream closed). But that bit is fairly easy.

In my mind the key is to decouple the message handling with the stream mechanics, and if you stop attempting to consume the message as a single ReadBytes() from a stream, life becomes much simpler.

[all of this is true whether your reads are blocking, or async (APM/await)]

piers7
  • 4,174
  • 34
  • 47
  • The problem I would have is that a method that "if asked for 4 bytes, don't return until 4 bytes received" is not practical for the sort of system I need to run, which can't hang a thread while waiting for data. I need to be able to set partial data aside and return to it later, which requires a FIFO buffer as the other answers have suggested. (My main question was really 'why is there nothing like this in the standard libraries?) – Kylotan Mar 26 '15 at 13:02
0

There are only three answers here which provide code. One of them is clumsy and the others do not answer the question.

Here a class that you can just copy and paste:

/// <summary>
/// This class is a very fast and threadsafe FIFO buffer
/// </summary>
public class FastFifo
{
    private List<Byte> mi_FifoData = new List<Byte>();

    /// <summary>
    /// Get the count of bytes in the Fifo buffer
    /// </summary>
    public int Count
    {
        get 
        { 
            lock (mi_FifoData)
            {
                return mi_FifoData.Count; 
            }
        }
    }

    /// <summary>
    /// Clears the Fifo buffer
    /// </summary>
    public void Clear()
    {
        lock (mi_FifoData)
        {
            mi_FifoData.Clear();
        }
    }

    /// <summary>
    /// Append data to the end of the fifo
    /// </summary>
    public void Push(Byte[] u8_Data)
    {
        lock (mi_FifoData)
        {
            // Internally the .NET framework uses Array.Copy() which is extremely fast
            mi_FifoData.AddRange(u8_Data);
        }
    }

    /// <summary>
    /// Get data from the beginning of the fifo.
    /// returns null if s32_Count bytes are not yet available.
    /// </summary>
    public Byte[] Pop(int s32_Count)
    {
        lock (mi_FifoData)
        {
            if (mi_FifoData.Count < s32_Count)
                return null;

            // Internally the .NET framework uses Array.Copy() which is extremely fast
            Byte[] u8_PopData = new Byte[s32_Count];
            mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count);
            mi_FifoData.RemoveRange(0, s32_Count);
            return u8_PopData;
        }
    }

    /// <summary>
    /// Gets a byte without removing it from the Fifo buffer
    /// returns -1 if the index is invalid
    /// </summary>
    public int PeekAt(int s32_Index)
    {
        lock (mi_FifoData)
        {
            if (s32_Index < 0 || s32_Index >= mi_FifoData.Count)
                return -1;

            return mi_FifoData[s32_Index];
        }
    }
}
Elmue
  • 7,602
  • 3
  • 47
  • 57
  • Just for posterity, this implementation boxes all the bytes written to it into objects so they can be added to the collection mi_FifoData. Considering this, the performance and memory footprint of this class will be horrible, if not fatal. – founderio Aug 10 '18 at 08:13
0

It sounds like you want to read from the socket into a MemoryStream buffer, and then 'pop' data out of the buffer and reset it each time a certain byte is encountered. It would look something like this:

void ReceiveAllMessages(Action<byte[]> messageReceived, Socket socket)
{
    var currentMessage = new MemoryStream();
    var buffer = new byte[128];

    while (true)
    {
        var read = socket.Receive(buffer, 0, buffer.Length);
        if (read == 0)
            break;     // Connection closed

        for (var i = 0; i < read; i++)
        {
            var currentByte = buffer[i];
            if (currentByte == END_OF_MESSAGE)
            {
                var message = currentMessage.ToByteArray();
                messageReceived(message);

                currentMessage = new MemoryStream();
            }
            else
            {
                currentMessage.Write(currentByte);
            }
        }
    }
}
Paul Stovell
  • 32,377
  • 16
  • 80
  • 108
  • No, I don't need help with the parsing, just with the actual buffering, but thanks. :) – Kylotan Aug 19 '11 at 14:37
  • This won't work if messages are split into several packets. It must be implemented using a FIFO buffer, as OP asked. – vgru Aug 19 '11 at 14:40
  • It does - it will keep appending to the same MemoryStream until the END_OF_MESSAGE byte is encountered, which could be in the first or the 85th packet – Paul Stovell Aug 19 '11 at 14:43
  • [edit] Got it, I didn't see the infinite loop. But, you are still mixing two responsibilities in the same method: receiving and parsing of data. What if there is no "END_OF_MESSAGE" byte, but the length of each message depends on its contents (e.g. a specific starting cookie, and then the length information encoded inside the message)? This problem must be solved in two steps: 1. class which does nothing more but enqueues bytes to a FIFO buffer and 2. class which parses the data and dequeues it. – vgru Aug 19 '11 at 14:55
0

You could do this with a Stream wrapping a ConcurrentQueue<ArraySegment<byte>> (keep in mind this makes it forward only). However I really dislike the idea of keeping data in memory before doing something with it; it opens you up to a bunch of attacks (intentional or not) regarding the size of the message. You might also want to Google "circular buffer".

You should actually be writing code that does something meaningful with the data as soon as it is received: 'Push Parsing' (this is what, for example, SAX supports). As an example of how you would do this with text:

private Encoding _encoding;
private Decoder _decoder;
private char[] _charData = new char[4];

public PushTextReader(Encoding encoding)
{
    _encoding = encoding;
    _decoder = _encoding.GetDecoder();
}

// A single connection requires its own decoder
// and charData. That connection should never
// call this method from multiple threads
// simultaneously.
// If you are using the ReadAsyncLoop you
// don't need to worry about it.
public void ReceiveData(ArraySegment<byte> data)
{
    // The two false parameters cause the decoder
    // to accept 'partial' characters.
    var charCount = _decoder.GetCharCount(data.Array, data.Offset, data.Count, false);
    charCount = _decoder.GetChars(data.Array, data.Offset, data.Count, _charData, 0, false);
    OnCharacterData(new ArraySegment<char>(_charData, 0, charCount));
}

If you must be able to accept complete messages before deserializing them, you can use a MemoryMappedFile, which has the advantage that the sending entity won't be able to out-of-memory your server. What gets tricky is resetting the file back to zero; because that can bunch of issues. One way to tackle this is by:

TCP Receiver End

  1. Write to the current stream.
  2. If the stream exceeds a certain length move to a new one.

Deserialization End

  1. Read from the current stream.
  2. Once you have emptied the stream destroy it.

The TCP receiver end is very simple. The deserializer end will need some elementary buffer stitching logic (remember to use Buffer.BlockCopy and not Array.Copy).

Side note: Sounds like a fun project, if I have the time and remember I might go ahead and implement this system.

Jonathan Dickinson
  • 9,050
  • 1
  • 37
  • 60
  • Jonathan, I don't see how parsing partial messages solves the security problem you mention because it's likely that the parsed object is larger than the raw bytes - attacks actually become more efficient that way. As it is, my messages have a size limit anyway, and the size is in the header so invalid messages are detected early. – Kylotan Aug 19 '11 at 15:24
  • @Kylotan - sending expensive messages to the server would work just as well, even if you restrict the total size of messages one could send a message that would take e.g. 20s to execute and then just queue up a bunch of other ones: again intentionally or unintentionally. – Jonathan Dickinson Aug 19 '11 at 16:32
  • Sure, but I don't see any intrinsic reason why such messages would be less expensive by parsing them piecemeal. – Kylotan Aug 19 '11 at 16:59
  • This is becoming a discussion. Not doing it is fine if you don't have your security hat on. – Jonathan Dickinson Aug 19 '11 at 17:08
  • I have my security hat on, but I don't agree that your way is more secure, sorry. Parsing data into a bigger representation makes you more likely to hit your resource limit, not less, and requires additional CPU and memory resources to parse messages that might turn out not to be fully-formed anyway. – Kylotan Aug 19 '11 at 19:22
  • The supposed speed advantage of `Buffer.BlockCopy` has been [debunked](https://stackoverflow.com/q/1389821/145173). – Edward Brey Mar 19 '19 at 01:29
  • @EdwardBrey that strongly depends on the version of .Net you're referring to (NB: answered Aug 19 '11). My own tests sometime prior to 2010 indicated otherwise: `Array.Copy` didn't have a fast-path for buffers. – Jonathan Dickinson Mar 25 '19 at 18:23
  • @JonathanDickinson Good point. "Obsolete" is a better way to put it. Your tests explain a vague memory of I have from back in the day of choosing `BlockCopy` to avoid loop overhead. – Edward Brey Mar 25 '19 at 18:28