2

We can call ReadAsync() and examine a buffer for read bytes...

PipeReader reader = ...;
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
long availableBytes = buffer.Length

If the length does not increase from before the call to ReadAsync, does that indicate the end of the pipe (no more bytes to read)? If not then what is the correct way of detecting 'end of pipe'?

We can signal consumption of bytes in the buffer like so:

reader.AdvanceTo(count);

And then check if there are any unconsumed bytes, or the possibility of future bytes being supplied (i.e. the producer has not yet signalled that it has stopped adding new bytes to the pipe)

readResult.IsCompleted

But if I am looking for a sequence (or sequences) in the buffer and waiting for a complete sequence before consuming it, then IsComplete appears to remain false even when the buffer contains all of the available bytes, and the producer has signalled completion.

Thanks.

redcalx
  • 8,177
  • 4
  • 56
  • 105
  • 1
    "no more bytes to read" != "the end of the pipe"; they're only the same thing once the pipe know that the underlying input is **closed**, i.e. no more data will *ever* arrive – Marc Gravell Sep 25 '19 at 08:23

1 Answers1

3

.IsCompleted does indicate the end of the pipe, in the sense of a socket etc being closed (as opposed to being open but not having any more data right now); I expect what is happening here is that you're:

  • fetching a read buffer
  • reading the entire buffer looking for a sequence, and not finding it
  • therefore processing zero bytes
  • and therefore saying .AdvanceTo(zero)

There's an important second overload of AdvanceTo - you shouldn't just tell it what you consumed; you should tell it what you examined, which in this case is probably: everything; doing this can avoid you getting stuck in a hot loop parsing the same incomplete frame over and over and over and over and over. For example, one of my read loops looks like (simplified):

while (true)
{
    var readResult = await input.ReadAsync();
    var buffer = readResult.Buffer;
    int handled = TryConsume(ref buffer); // note: this **changes** buffer, slicing (Slice)
    // data from the start; when TryConsume exits, it will contain everything
    // that is *left*, but we will have effectively examined all of it; we will
    // have consumed any complete frames that we can from it

    // advance the pipe
    input.AdvanceTo(buffer.Start, buffer.End);

    // exit if we aren't making progress, and nothing else will be forthcoming
    if (handled == 0 && readResult.IsCompleted)
    {
        break; // no more data, or trailing incomplete messages
    }
}
Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • Thanks Marc. Yes, I hadn't noticed the AdvanceTo() overload with the 'examined' position parameter. ok, so if my sequence detection ultimately fails to detect anything then I am tempted to 'rewind' the examined position, otherwise that position is latent state if/when the same PipeReader is passed to subsequent routines for further reading. I tried this and it appears to work fine - do you think this is correct/ok (an expected/normal pattern)? – redcalx Sep 25 '19 at 09:29
  • @redcalx there's a chance you could end up in a hot loop doing that; the preferred approach is to *declare* that you're not making progress, by telling it what you examined - which is: everything; I don't know whether it was ever implemented, but : during the early discussions on this API, it was *proposed* that if you fail to make the same progress *twice* (i.e. if you see the same buffer and report 0/0), the implementation assumes you're just using it wrong, and backs off until there is more data; maybe that is saving you? – Marc Gravell Sep 25 '19 at 17:15