2

I have the following function that returns the standard output data, as an async stream, that results from running a System.Diagnostics.Process. Everything currently in the method works as intended; I can call it in an await foreach() loop and I get each line of output as its generated by the external exe.

private static async IAsyncEnumerable<string> ProcessAsyncStream (
    ProcessStartInfo processStartInfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processStartInfo };

   // Buffer used to pass data from event-handler back to this method
   BufferBlock<string> dataBuffer = new BufferBlock<string>();

   process.OutputDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         dataBuffer.Complete();
      }
      else
      {
         dataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginOutputReadLine();

   // Return data line by line  
   while (await dataBuffer.OutputAvailableAsync())
      yield return dataBuffer.Receive();
}

My problem is that now I need it to return both the standard output and standard error results. I made this class to hold the data from each stream.

public class ProcessData
{
   public string Error { get; set; } = "";
   public string Output { get; set; } = "";
}

and changed ProcessAsyncStream() to look like this

private static async IAsyncEnumerable<ProcessData> ProcessAsyncStream (
    ProcessStartInfo processStartInfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processStartInfo };

   // Buffer used to pass data from event-handlers back to this method
   BufferBlock<string> outputDataBuffer = new BufferBlock<string>();
   BufferBlock<string> errorDataBuffer = new BufferBlock<string>();

   
   process.OutputDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         outputDataBuffer.Complete();
      }
      else
      {
         outputDataBuffer.Post(e.Data);
      }
   };

   process.ErrorDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         errorDataBuffer.Complete();
      }
      else
      {
         errorDataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginOutputReadLine();

   // Return data line by line
   while (await outputDataBuffer.OutputAvailableAsync()
          || await errorDataBuffer.OutputAvailableAsync())
      yield return new ProcessData() 
      {
         Error = errorDataBuffer.Receive(), 
         Output = outputDataBuffer.Receive()
      }
}

The problem is that if either buffer completes before the other than the method hangs up because that buffer's .Receive() doesn't have any data to receive. If I change the while condition to && then I won't get all the data from the other buffer.

Any suggestions?

Peter Csala
  • 17,736
  • 16
  • 35
  • 75
master_ruko
  • 629
  • 1
  • 4
  • 22
  • Would `Error = errorDataBuffer.OutputAvailableAsync() ? errorDataBuffer.Receive() : null` (and similarly for Output) work for you? – Klaus Gütter Nov 04 '20 at 06:28
  • You are checking the `outputDataBuffer.OutputAvailableAsync()` twice in the last `while` loop. Is this intentional or a bug? – Theodor Zoulias Nov 04 '20 at 06:30
  • This is a strange solution you have here. Also shouldn't you be using `TryReceive` – TheGeneral Nov 04 '20 at 06:32
  • @TheodorZoulias oops, that was just a mistype. I did some format editing in the SO editor and am half asleep and made a mistake. I am not doing that in my real code. – master_ruko Nov 04 '20 at 06:33
  • @KlausGütter Yes, I believe that would work. I can't believe I didn't think of that. That's what I get for programming while sleep deprived. Thanks – master_ruko Nov 04 '20 at 06:34
  • @TheGeneral Why is it a strange solution? And yes, I probably should be. – master_ruko Nov 04 '20 at 06:35
  • OK, no problem. Could you fix this mistype? I am not sure how to fix it myself. Btw the pattern you use for converting a `BufferBlock` to an `IAsyncEnumerable` is not optimal, and is not safe for multiple consumers. Look [here](https://stackoverflow.com/questions/49389273/for-a-tpl-dataflow-how-do-i-get-my-hands-on-all-the-output-produced-by-a-transf/62410007#62410007) for the correct pattern (the `ToAsyncEnumerable` extension method). – Theodor Zoulias Nov 04 '20 at 06:39
  • Regarding the condition of the stream completion, do you want it to complete when both events (`OutputDataReceived` and `ErrorDataReceived`) have been triggered with `e.Data is null`? Alternatively you could complete the stream on the [`Exited`](https://learn.microsoft.com/en-us/dotnet/api/system.diagnostics.process.exited) event. – Theodor Zoulias Nov 04 '20 at 06:50
  • Yes. I want both ```e.Data```'s to be null. I want all the data that this process has to offer. – master_ruko Nov 04 '20 at 06:53
  • @TheodorZoulias So just by moving the ```while``` to an extension method its all of the sudden safe for multiple consumers? I don't understand this. – master_ruko Nov 04 '20 at 07:05
  • 1
    master_ruko no, what makes it safe is the use of the `TryReceive` method. In a multiple-consumers scenario it is possible to get an `InvalidOperationException` if you call `Receive` after awaiting the `OutputAvailableAsync`. Also by calling the `TryReceive` in a `while` loop you may get better performance in high throughput scenarios, because the `OutputAvailableAsync` is relatively expensive. – Theodor Zoulias Nov 04 '20 at 07:13
  • @TheodorZoulias Ah, I see. I was unaware of this, as I haven't had that problem yet. – master_ruko Nov 04 '20 at 07:23
  • 1
    As a side note, in case you are interested about performance, the [Channels](https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/) are considered superior to the `BufferBlock`. They use `ValueTask`s internally, and as a result they are less allocatey. Also propagating a `struct ProcessData` instead of a `class` could be beneficial too. – Theodor Zoulias Nov 04 '20 at 07:25

3 Answers3

3

Regarding the actual problem, there is an issue with the process flow of reading the blocks. The easiest solution is to just use a single buffer with multiple producers and a single consumer combined with a message packet.

The conceptual issue that you are trying to solve with the DataFlow blocks is in the fundamental nature of events an async streams. Events are pushed, and async streams are pulled.

There are several solutions that would map them together, though I think the most elegant would be just to use an Unbounded Channel as the buffer.

Channels are more modern approach than DataFlow, have less degrees of freedom, less clunky then a BufferBlock, and very lightweight and highly optimized. Additionally, I would just pass a wrapper for the different response types.

Disregarding any other problem (conceptual or otherwise).

Given

public enum MessageType
{
   Output,
   Error
}

public class Message
{
   public MessageType MessageType { get; set; }
   public string Data { get; set; }

   public Message(string data, MessageType messageType )
   {
      Data = data;
      MessageType = messageType;
   }
}

Usage

private async IAsyncEnumerable<Message> ProcessAsyncStreamAsync(
     ProcessStartInfo processStartInfo, 
     CancellationToken cancellationToken)
{
   using var process = new Process() { StartInfo = processStartInfo };

   var ch = Channel.CreateUnbounded<Message>();
   var completeCount = 0;

   void OnReceived(string data, MessageType type)
   {
      // The Interlocked memory barrier is likely overkill here
      if (data is null && Interlocked.Increment(ref completeCount) == 2)
         ch?.Writer.Complete();
      else
         ch?.Writer.WriteAsync(new Message(data, type), cancellationToken);
   }

   process.OutputDataReceived += (_, args) => OnReceived(args.Data, MessageType.Output);
   process.ErrorDataReceived += (_, args) => OnReceived(args.Data, MessageType.Error);

   // start the process 
   // ...

   await foreach (var message in ch.Reader
           .ReadAllAsync(cancellationToken)
           .ConfigureAwait(false))
      yield return message;

   // cleanup
   // ...
}

Note : completely untested

TheGeneral
  • 79,002
  • 9
  • 103
  • 141
  • Can I set the ```UnboundedChannelOptions.SingleReader``` option to true in this case? I was reading up on channels, as this is a new thing for me, and read that setting these options can help to optimize performance in some cases. I'm going to try it, cuz I think its appropriate. I'm just not sure if there's something I'm not aware of that might cause any issues. – master_ruko Nov 05 '20 at 19:26
  • Also, I've never used a cancelation token. Should I register a delegate to stop the process if the cancelation token is activated? – master_ruko Nov 05 '20 at 21:52
  • @master_ruko Yes single reader is indeed appropriate here – TheGeneral Nov 05 '20 at 23:00
  • @master_ruko the token with throw in the channel methods. If you need to clean up on token cancellation, you can either catch the exception Operation Cancelled. Or register with the the token itself. I think in this circumstance id go with catch or maybe even a finally to do your clean up and let the exception propagate – TheGeneral Nov 05 '20 at 23:03
1

Complete on exit instead.

void HandleData(object sender, DataReceivedEventArgs e)
{
    if (e.Data != null) dataBuffer.Post(e.Data);
}

process.OutputDataReceived += HandleData;
process.ErrorDataReceived += HandleData;
process.Exited += (s,e) => 
{
    process.WaitForExit();
    dataBuffer.Complete();
};
John Wu
  • 50,556
  • 8
  • 44
  • 80
  • 1
    While I'm sure that `Process` always raises events in the expected order, I see no reason to expect the Windows thread scheduler to guarantee that the `Exited` event is always raised after the last `HandleData()` call has been made. These events are raised asynchronously in the thread pool, and not synchronized with each other, so it's always possible that there will be another call to `HandleData()` after the `Exited` event is raised. – Peter Duniho Nov 04 '20 at 09:04
  • In other words, I see why this approach would be tempting, but it seems like one of the worst options (not even counting that the OP keeps the stdout and stderr data streams separated, while the above does not provide for that...that seems relatively simple to fix though). – Peter Duniho Nov 04 '20 at 09:06
  • I did an experiment and verified Peter Duniho's comment. The `Exited` is indeed invoked before all handlers of the `OutputDataReceived` and `ErrorDataReceived` events have completed. The [documentation](https://learn.microsoft.com/en-us/dotnet/api/system.diagnostics.process.outputdatareceived#remarks) indicates that calling the `WaitForExit` method should be sufficient to ensure that all handlers have completed, and my experiment confirmed that. So an easy fix could be to call the `WaitForExit` before completing the buffer (inside the `Exited` event handler). – Theodor Zoulias Nov 04 '20 at 10:42
  • 1
    @TheodorZoulias Thanks. Revised the code per your suggestion. – John Wu Nov 04 '20 at 16:22
  • 1
    Unfortunately, the suggestion from @TheodorZoulias does not work. The documentation [hints at this](https://learn.microsoft.com/en-us/dotnet/api/system.diagnostics.process.waitforexit#System_Diagnostics_Process_WaitForExit), though is not 100% explicit: _"ensures that all processing has been completed, including the handling of asynchronous events"_. The `WaitForExit()` method waits for _all_ event handlers to return, including the `Exited` handler. If you call it from that handler (or any other), the process will deadlock. – Peter Duniho Nov 04 '20 at 18:18
  • @PeterDuniho good point. While doing my tests I didn't experience any deadlocks, which indicates that the handlers of the `Exited` event may not affect the behavior of the `WaitForExit`. But it should be easy to remove this risk factor, by offloading the `WaitForExit()`+`buffer.Complete()` to another thread (with `Task.Run`). This way the `Exited` handler will not be blocked. – Theodor Zoulias Nov 04 '20 at 18:46
  • @TheodorZoulias: well, I _did_ test it, and having a call for `WaitForExit()` in the `Exited` handler causes deadlock, at least on .NET Framework. Yes, pushing that off to the thread pool can solve the deadlock issue, but it further complicates the solution. The primary appeal of the code above is its simplicity; the channels approach is IMHO better overall, and once the above has been modified to solve all the problems (e.g. having to call `WaitForExit()`, in a task, and fixing the handlers so that out and err are distinguished), you get something not dissimilar to the channels approach. – Peter Duniho Nov 04 '20 at 18:51
  • @PeterDuniho my tests were on the .NET Core 3.1.3 (no deadlock). I just tested on .NET Framework 4.8.3801, and didn't observe a deadlock either. It does hate calling `WaitForExit` twice from different threads though. One of the calls throws a random exception. Indeed this complication reduces the appeal of this solution. But choosing a `Channel` over a `BufferBlock` makes no difference regarding the main issue, which is finding the correct moment to mark the completion of the async queue. Depending on receiving `null` `e.Data` from the output/error handlers is not terribly appealing either. – Theodor Zoulias Nov 04 '20 at 19:31
  • @TheodorZoulias: your concern about the null values is unfounded. That's well-defined behavior. The event handlers are never going to be called more than once with null. The null specifically marks the end of the stream, and thus the last time the handler will be called. – Peter Duniho Nov 04 '20 at 19:34
  • @PeterDuniho this behavior is well-defined experimentally, but is it documented? I haven't seen it described in the documentation so far. – Theodor Zoulias Nov 04 '20 at 19:37
  • 1
    @TheodorZoulias: The doc is not explicit, granted. But that doesn't mean that that's not what the documentation says. [The documentation says](https://docs.microsoft.com/en-us/dotnet/api/system.diagnostics.datareceivedeventargs.data#remarks) that _"When the redirected stream is closed, a null line is sent to the event handler"_. It doesn't say that `null` is sent any other time. It's only sent _when the stream is closed_. How many times do you think the stream is closed? It would be pretty silly to expect the documentation to always call out all the situations when something _doesn't_ happen. – Peter Duniho Nov 04 '20 at 19:47
  • @PeterDuniho OK, agreed. This excerpt from the documentation is convincing enough for me. :-) – Theodor Zoulias Nov 04 '20 at 19:50
  • @PeterDuniho to be fair though, having to count the nulls coming from the two events in order to signal the completion of the combined stream (like the General does in their [answer](https://stackoverflow.com/a/64676176/11178549)) looks a bit sketchy. It makes me thing that the `Process` class lacks a proper mechanism for awaiting its completion, something like the `WaitForExit` but asynchronous. The `Exited` event, although [advertised](https://docs.microsoft.com/en-us/dotnet/api/system.diagnostics.process.exited#remarks) as such, is not equivalent because it is triggered prematurely. – Theodor Zoulias Nov 06 '20 at 07:49
0

You could use a single buffer of ProcessData items:

var buffer = new BufferBlock<ProcessData>();

Then use a custom Complete mechanism to complete the buffer when both events have propagated a null value:

process.OutputDataReceived += (s, e) =>
{
    if (e.Data is null) Complete(1);
        else buffer.Post(new ProcessData() { Output = e.Data });
};

process.ErrorDataReceived += (s, e) =>
{
    if (e.Data is null) Complete(2);
        else buffer.Post(new ProcessData() { Error = e.Data });
};

Here is an implementation of the Complete method:

bool[] completeState = new bool[2];
void Complete(int index)
{
    bool completed;
    lock (completeState.SyncRoot)
    {
        completeState[index - 1] = true;
        completed = completeState.All(v => v);
    }
    if (completed) buffer.Complete();
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104