1

I'm using two ReadAsync() calls, and Task.WhenAny() to handle two NetworkStreams (TcpClient).

Will the following await code miss any data-capture?

  • Q1 in code: What happens if both streams have new data at EXACTLY the same time?
  • Q2 in code: Is there any chance of the WriteAsync() taking too long and losing the stored-buffer?
  • Q3: Is there a "better" way to tackle this

I am writing a piece of code that is intended to act as a man-in-the-middle filter of a TCP stream (to later allow filtering/monitoring of certain packets)

The generalised logic should be:

  • Client establishes connection to filter, then filter establishes new connection to chosen server
  • If any data arrives from the client, save it and send it to the server
  • If any data arrives from the server, save it and send it to the client

Error-handling exists (where listed) .. Have I missed anything important?


I used the following answer to a question about ".Net 4.5 Async Feature for Socket Programming" as a starting point:

var read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
var read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);

try
{
  while (true)
  {
     Task<int> read_task_occurred;
     try
     {
        read_task_occurred = await Task.WhenAny(read_task_from_client, read_task_from_server);
            //Q1: What happens if both streams have new data at EXACTLY the same time?

        if (read_task_occurred.Status != TaskStatus.RanToCompletion)
        {
          Trace.WriteLine(string.Format("[{0}] - Task failure", ID, read_task_occurred.ToString()));
          break;
        }
     }
     catch (AggregateException aex)
     {
        for (int i = 0; i < aex.Data.Values.Count; i++)
        {
          var aex_item = aex.Data[i];
          Trace.WriteLine(string.Format("[{0}] - Aggregate failure {1} - {2}", ID, i, aex_item));
        }
        break;
     }

     var bytes_read = read_task_occurred.Result;
     if (read_task_occurred.Result == 0)
     {
        // If a read-operation returns zero, the stream has closed.
        OneStreamHasClosed(read_task_from_client, read_task_from_server, read_task_occurred);
        break;
     }

     if (read_task_occurred == read_task_from_client)
     {
        BytesFromClient += read_task_from_client.Result;
        Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
        await tx_stream.WriteAsync(rx_buffer, 0, bytes_read);
        await FileStream_Incoming.WriteAsync(rx_buffer, 0, bytes_read);
            // Q2: Any chance of the WriteAsync taking too long?
            //    (e.g. rx_buffer begins to be filled again before being written to tx_stream or the filestream)

        read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
     }
     else if (read_task_occurred == read_task_from_server)
     {
        BytesFromServer += read_task_from_server.Result;
        Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
        await rx_stream.WriteAsync(tx_buffer, 0, bytes_read);
        await FileStream_Outgoing.WriteAsync(tx_buffer, 0, bytes_read);

        read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
     }
  }
}
finally
{
  FileStream_Incoming.Close();
  FileStream_Outgoing.Close();
}

So far, this seems to work as expected, capturing and logging multiple streams... However, I'm not certain if I'm using the await statements safely

This will later run in multiple threads (possibly one per Incoming-Connection, but that's a separate topic)

Update (to Q2 in code)

By refactoring the original "await tx_stream.Write..." and "await xxx_FileStream.Write..." as follows, I believe that I have been able to improve one main race-condition at Q2.. Still not sure if this is the "best/recommended" solution:

// Code changed to a call to MultiWrite
private void MultiWrite(byte[] buffer, int bytes_read, Stream s1, Stream s2)
{
  Task writer1 = s1.WriteAsync(buffer, 0, bytes_read);
  Task writer2 = s2.WriteAsync(buffer, 0, bytes_read);
  Task.WaitAll(writer1, writer2);
}

Update 2 (code-testing of await)

I've been told that await does not allow concurrent tasks to run... This puzzles me since I then cannot understand how/why the following could run...

private async Task<char> SimpleTask(char x, int sleep_ms) { return await Task.Run(() => { Console.Write(x); Thread.Sleep(sleep_ms); return x; }); }
internal async void DoStuff()
{
  var a_task = SimpleTask('a', 100);
  var b_task = SimpleTask('b', 250);
  var c_task = SimpleTask('c', 333);

  while (true)
  {
    var write_task_occurred = await Task.WhenAny(a_task, b_task, c_task);
    var char_written = write_task_occurred.Result;
    switch (char_written)
    {
      case 'a': a_task = SimpleTask('a', 100); break;
      case 'b': b_task = SimpleTask('b', 250); break;
      case 'c': c_task = SimpleTask('c', 333); break;
    }
  }
}

The snippet above does run (and as I would expect produces the following multi-threaded nonsense:

aabacabaacabaacbaaabcaabacaabacabaabacaabacabaacabaacbaabacaabacabaacabaabacaab

Can anyone explain where/why the above approach is wrong , and if so, how it could be improved.


Update 3: split logic into two methods

I have integrated the "write to output-stream and a file, ensure both outputs have the data in 'buffer' before further Read()" and have also split the code to call MultiWrite() as per my earlier update to Q2:

As per the suggestion(s) by @usr and @Pekka, I have split the code into two methods as below...

private void ProcessStreams_Good()
{
  Task t1 = CopyClientToServer(), t2 = CopyServerToClient();

  Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}", ID, BytesFromClient, BytesFromServer));
  Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}
private async void ProcessStreams_Broken()
{
  await CopyClientToServer(); await CopyServerToClient();

  Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}\r\n", ID, BytesFromClient, BytesFromServer));
  Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}

private async Task CopyClientToServer()
{
  var bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
  while (bytes_read > 0)
  {
    BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
    MultiWrite(rx_buffer, bytes_read, tx_stream, FileStream_FromClient);
    bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
  }
}
private async Task CopyServerToClient()
{
  var bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
  while (bytes_read > 0)
  {
    BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
    MultiWrite(tx_buffer, bytes_read, rx_stream, FileStream_FromServer);
    bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
  }
}

Yes, I am aware of the reason why ProcessStreams_Broken() fails and ProcessStreams_Good() works as expected.

Q: This new code is slightly neater, but is it any "better" ?


Late Update (after question closed)

After the question closed, I came across a Best Practices for async/await link which was quite helpful.

Community
  • 1
  • 1
Steven_W
  • 848
  • 1
  • 7
  • 17

2 Answers2

1

await and WhenAny do not start any operation. They merely wait for a running operation to complete. All reads that you have started will complete eventually and data will be taken from the streams. This is true whether you observe the result or not.

I understand you want to relay data from client to server and from server to client. So why not start two async methods concurrently each of which does one of the two relay directions? That removes the need from WhenAny and all that complicated logic. You need to throw this away.

Q1 in code: What happens if both streams have new data at EXACTLY the same time?

You do not need the answer to that question. You must handle the completion of all reads that you start no matter when they complete. Else, you lose data. Maybe you were assuming that non-complete outstanding reads were (somehow) cancelled and only one read was actually "taking"?! That is not the case. All reads complete. There is no way to cancel one (without dropping the data).

Q2 in code: Is there any chance of the WriteAsync() taking too long and losing the stored-buffer?

Not sure what you mean. If a timeout occurs you need a strategy for dealing with that. Usually, you'd log the error and shut down.

usr
  • 168,620
  • 35
  • 240
  • 369
  • Re "Q1" - Using await for a single operation seemed straight forward - There seemed to be less information on using concurrent awaits then handling callbacks for "client" or "server" (or other streams) - Quite happy to throw this code-snippet away if I can find something { better / clearer } – Steven_W Mar 02 '15 at 14:00
  • Re "Q2", I was thinking of the following sequence: 1; rx_buffer=Read(), gets 10 bytes, 2a: WriteAsync to tx_stream, 2b, WriteAsync to file, 3, code loops back to fill rx_buffer before steps 2a+2b have both completed (this this would corrupt the contents of the buffer) – Steven_W Mar 02 '15 at 14:03
  • Re "Q2" buffer - I assume that I could "improve" this be ensuring I called Task.WaitAll(2a,2b) before I tried reading more data ... But then the code starts to get too "messy" with multiple "edge-cases" ... I've not yet been able to find a good "reference example" that I can learn from. – Steven_W Mar 02 '15 at 14:07
  • I want to make sure you understand what await does because this is a common problem that people have. await does not start any operation and it does not introduce concurrency. It waits for something that already runs. Do you fully understand this? – usr Mar 02 '15 at 14:07
  • I understand that await does not necessarily "instantly spawn a new thread and run instantly", but that it does allow concurrent Tasks to run, and provides a cleaner approach to using BeginRead()/EndRead() with callbacks.. I'm not sure exactly what you mean by "introduce concurrency" (since the whole purpose of await seems to be to "allow concurrency to occur) .... Any URL references/examples of what you mean (or the difference between the two) ?? – Steven_W Mar 02 '15 at 14:16
  • 1
    await does not allow concurrent tasks to run. They already run when await starts to operate. Think of await as `Task.Wait` but releasing the current thread. Report back when you have researched this issue and understand it. I can't help you without this understanding. – usr Mar 02 '15 at 14:45
  • Btw if you assume that await starts its operand concurrently your code is full of race conditions and would not work at all... Even the WhenAny would be started concurrently and not actually wait. This assumption must be false. – usr Mar 02 '15 at 14:53
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/72070/discussion-between-steven-w-and-usr). – Steven_W Mar 02 '15 at 15:21
  • @Steven_W in your new code `ProcessStreams_Broken` you first copy client to server and *then* in the other direction. You still have not understood what await does. As I said: Think of it as calling Wait. That model explains why this cannot work. – usr Mar 03 '15 at 18:33
  • Agreed ...That is why I labelled it "Broken", and stated "I **am** aware of the reason why `ProcessStreams_Broken()` fails" --- Whilst I DO understand what **await** is for, I was hoping for advice / "recommendations on good-practice" / "suggestions for improvement" for situations like this that involve multiple I/O tasks (that need to be concurrent, but not necessarily each on their own thread) rather than just telling me that my code cannot work.... – Steven_W Mar 03 '15 at 22:37
  • You're right, I overlooked that. Honestly, there is quite a lot of stuff in the question now... Anyway, ProcessStreams_Good is quite "good" except that it starts the IO work and immediately returns. That's probably not intended. Add `await Task.WhenAll(t1, t2);` and make it return `Task`. The key insight here is that the two relay directions are independent and can run as independent tasks. The "good" version of the code is now best-practice. Correction: MultiWrite should probably be async as well. – usr Mar 03 '15 at 22:42
  • Thanks - Was difficult to keep the question-size down whilst still keeping the clarity.. – Steven_W Mar 03 '15 at 22:55
1

Concurrency is all about non-determinism. The two endpoints of the channel necessarily have separate clocks and cannot tell which message you received first (within clock jitter). Provided you (and the entire OS stack) fairly act on received messages and forward them, then the sequence in which this happens in not relevant.

If you want to avoid any bias, then develop a situation that does introduce any preference into either direction. For example, your test Task.WhenAny(read_task_from_client, read_task_from_server); is likely biased towards one of the tasks. Use the suggestion by @usr to create separate methods to avoid this.

Finally, be really careful when tearing down the sessions. It is not possible to exactly emulate all the possible cases to abruptly tear down from user code that one of the endpoints may do. Your emulation fidelity will be challenged by this and may invalidate results. Similarly, you may have accepted data on one stream as the other party drops the session. There is no way to correctly recover from this - the best you can do again is to pretend that the partner dropped their end before they could have seen this.

Pekka
  • 3,529
  • 27
  • 45
  • Re point 1 - I appreciate that the first ReadAsync() could come from either stream (non-deterministically but maybe biased) ... However, both read-callbacks will copy the data to the other stream, then continue waiting again ... Therefore (even though one stream might be processed quicker than the other), The end result should be that both inputs get copied to their relevant outputs and there is always something ready to catch new input if/when it should arrive. (As you say, the exact sequence of { ReadFromClient+WriteToServer } and {ReadFromServer+WriteToClient} is immaterial. – Steven_W Mar 02 '15 at 21:59
  • Re Point 2 - If there is any bias, I would want "ReadFromClient" to take priority ... However, in practice (and recent tests), the code appears to function as expected, happily processing + capturing multiple bi-directional connections with no noticeable delay. -- I will re-read the answer+comments from @usr again but when I first read them through I was unable to see any advantage of two separate methods .. – Steven_W Mar 02 '15 at 22:09
  • Re point 2 still - In addition, (due to the packet-like content of the two streams), I would then be faced for later synchronization problems processing the conversation-ordering - I'll have a play and update the question to see if this "improves" things. – Steven_W Mar 02 '15 at 22:10
  • Finally, Point 3 - Thanks - I'm aware that the could be a number of different reasons for teardown ... Rather than cover every eventuality, I'm hoping to trap the basics and exit "as cleanly as possible" ... I'm open to suggestions for improvements but my main focus is on the "awaits" at the moment - I'll deal with the Exception-handling afterwards. – Steven_W Mar 02 '15 at 22:16
  • 1
    Point 1 was intended to caution about the possible bias that can affect fairness. Many protocols would be based on ping-pong transactions and never be hit by the race condition. It is most likely to be exposed if the connections are pooling multiple sessions that could interact and then result in different ordering. – Pekka Mar 04 '15 at 22:26
  • Yes - as it happens, this is a ping+pong (or ping+"multiple-pongs in return" ) protocol and needs to handle **multiple** but not **many** simultaneous connections... Therefore, "perfectly-balanced fairness" is not critical in this situation, though I appreciate that in a "general" case, this *possible* bias needs to be considered. – Steven_W Mar 04 '15 at 22:51