2

I am trying to switch my ASP.NET application using .Net 6 from Stream to PipeReader as recommended by Microsoft. Here is my custom method:

private static async Task<byte[]> GetRequestBodyBytesAsync(PipeReader reader)
{
    byte[] bodyBytes;

    do
    {
        ReadResult readResult = await reader.ReadAsync();
        if (readResult.IsCompleted || readResult.IsCanceled)
        {
            ReadOnlySequence<byte> slice = readResult.Buffer.Slice(readResult.Buffer.Start, readResult.Buffer.End);
            bodyBytes = BuffersExtensions.ToArray(slice);
            reader.AdvanceTo(readResult.Buffer.End);
            break;
        }
    } while (true);

    return bodyBytes;
}

However when I use the above static method in my controller:

[HttpPost]
[Route(MyUrl)]
public async Task MyPostAsync()
{
    byte[] bodyBytes = await GetRequestBodyBytesAsync(Request.BodyReader);
    MyProtobuf myProtobuf = MyProtobuf.Parser.ParseFrom(bodyBytes);

then I can see in the debugger that the readResult.IsCompleted is never true and the readResult.Buffer stays the same 21 bytes.

Adding else { reader.AdvanceTo(readResult.Buffer.Start); } has not changed anything.

I need the complete byte array, so that I can pass it to Protobuf parsing method (there are no streaming parsers for protobuf)... how can I please use IO.Pipelines here?

UPDATE:

The following method (found in PipeReaderExtensions.cs and confirmed by Marc) works now for me and it even can be improved further by avoiding copying data around - see the great comments by Marc below.

private static async Task<byte[]> GetRequestBodyBytesAsync(PipeReader reader)
{
    do
    {
        ReadResult readResult = await reader.ReadAsync();
        if (readResult.IsCompleted || readResult.IsCanceled)
        {
            return readResult.Buffer.ToArray();
        }

        // consume nothing, keep reading from the pipe reader until all data is there
        reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
    } while (true);
}
Alexander Farber
  • 21,519
  • 75
  • 241
  • 416
  • Do you *need* to do this? What are you trying to achieve that your current solution isn't already fine? – Stuart.Sklinar Jun 09 '22 at 16:32
  • Yes, because I need to switch to Async calls anyway, since my current code occasionally throws `"System.InvalidOperationException: Synchronous operations are disallowed."` – Alexander Farber Jun 09 '22 at 18:30
  • 1
    Side note: your `.Slice` seems redundant - just `readBytes = readResult.Buffer.ToArray();` ? – Marc Gravell Jun 10 '22 at 11:00
  • 1
    Side note: I see you're dealing with protobuf; both Google.Protobuf and protobuf-net are, IIRC, happy to work with `ReadOnlySequence` - so you can probably avoid your `ToArray()` entirely, and perform the parse step inside the loop so you can use zero-copy processing (i.e. deserialize before calling `AdvanceTo`); if that isn't possible (for some reason), I would strongly suggest using a *leased* array rather than allocating a new one each time - perhaps return an `ArraySegment` or a `ReadOnlyMemory` – Marc Gravell Jun 10 '22 at 11:04
  • That is a great comment Marc, thank you! But actually I need the byte array to lookup a `WireType.VarInt` there - so that I know, which protobuf parser to use (the bad design by the customer forces me to lookup the numeric value in the `version` field first).... – Alexander Farber Jun 10 '22 at 11:11
  • 1
    @AlexanderFarber huh; I notice with surprise that `CodedInputStream` indeed doesn't offer you that; however, if you *want* a varint decoder that works against `ReadOnlySequence`: that's pretty trivial - happy to hand you one – Marc Gravell Jun 10 '22 at 11:17
  • Marc, that would be great - because currently me and my colleagues are using [this large hack](https://gist.github.com/afarber/459cfeb41a55f38488201d6ce2c46e80) in order to determine the `uint` value of the `Version` field, before passing the byte array to one of 4 possible protobuf parsers. Please send your example code to farber72@outlook.de – Alexander Farber Jun 10 '22 at 11:36
  • Marc, I have followed your advice and rewritten my application to use ReadOnlySequence - which I then pass to `ParseFrom()`. Unfortunately, I get exceptions like `Message Container doesn't provide the generated method that enables ParseContext-based parsing. You might need to regenerate the generated protobuf code.` as I have documented in [the issue 943 at Github](https://github.com/protobuf-net/protobuf-net/issues/943). – Alexander Farber Jul 28 '22 at 12:39

1 Answers1

2

You are meant to call AdvanceTo after every ReadAsync, so you should really have an else that does something like:

reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);

This says "I've checked everything, and consumed nothing" (at least semantically; it doesn't matter that you haven't really looked at anything - the point is to say "all of these bytes: aren't useful to me yet") - which means that ReadAsync now shouldn't try to give you anything else until it has some additional data or the data has ended. I'm a little surprised that it didn't throw an exception already, actually.

Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • 1
    Thank you Marc, I have just found similar code in the [PipeReaderExtensions.cs](https://github.com/dotnet/aspnetcore/blob/main/src/SignalR/common/testassets/Tests.Utils/PipeReaderExtensions.cs) at the Github, which matches your answer (the `ReadAllAsync` method there). I still have difficulty to understand the parameter `examined` for the `AdvanceTo` method though - why does the library care about it at all? I understand the `consumed` parameter (as "dear library, you can throw away that data now, I have consumed or copied it"). But why the `examined` parameter? – Alexander Farber Jun 10 '22 at 11:07
  • 1
    @AlexanderFarber it isn't intuitive, perhaps, agreed; what it is *really* trying to help determine is: if you aren't making progress, is that because of *your reader*, or because *there isn't enough data*; consider, for example, that you're writing a deframer; you *could* choose to only deframe one (or zero) frame per call - it gives you 500 bytes, and you say "I've looked at 20 bytes and consumed 20 bytes"; now compare to a multi-frame deframer where there *wasn't enough data to consume a second frame": "I've looked at 500 bytes and consumed 20 bytes". I agree it feels like a `bool` might... – Marc Gravell Jun 10 '22 at 11:13
  • 1
    @AlexanderFarber ...have been more obvious here, i.e. `bool needMoreDataToMakeProgress` ! but the library is really just trying to know whether or not to offer you those remaining 480 bytes again next time you call `ReadAsync`, vs whether it should wait until it has something more to offer. – Marc Gravell Jun 10 '22 at 11:14