1

I'm working with quite specific web API that uses TCP and SSL stream. The connection to API is persistent (should not be closed after each write/read), because this connection is used to receive event notifications from the server.

So I implemented a cycled reading method:

private void Listen()
{
    int length;
    do
    {
        var buffer = new byte[TcpClient.ReceiveBufferSize];
        length = SslStream.Read(buffer, 0, TcpClient.ReceiveBufferSize);

        if (length > 0)
        {
            // Process received data
            Listen();
        }
    } while (length > 0);

    //reconnect
}

Now I need to call some API methods. I want them to support TPL (async and await). The problem is that the asynchronous part of these methods are actually implemented in the reading method above (Listen). For example:

public async Task<bool> Authenticate(string clientId, string clientSecret)
{
    await SendAuthMessageOverNetwork(clientId, clientSecret);
    return await ReceiveAuthResponseFromServer();
    // The problem is that there is no ReceiveAuthResponseFromServer method - 
    // the response is received in reading thread (Listen).
}

I'm not quite familiar with TPL yet. I solved this problem by using a Task which actually does not do anything but wait a signal (AutoResetEvent) from the reading thread:

private AutoResetEvent _loginEvent;
private bool _loginResult;

public async Task<bool> Authenticate(string clientId, string clientSecret)
{
    await SendAuthMessageOverNetwork(clientId, clientSecret);
    return await Task<bool>.Factory.StartNew(() => { _loginEvent.WaitOne(); return _loginResult; });
}

private void Listen()
{
    ...
    if (msg.Type == MessageTypes.AuthenticationResponse)
    {
        _loginResult = true;
        _loginEvent.Set();
    }
    ...
}

But I don't quite like this solution. Maybe there is a simpler way to implement what I want to? Can this be done by using only Task functionality, without AutoResetEvent and intermediate global variable?

Aleksey Shubin
  • 1,960
  • 2
  • 20
  • 39

1 Answers1

4

You're right that using an AutoResetEvent is the wrong way to do it. What you actually want to do is:

  • Assuming every SendAuthMessageOverNetwork is matched by a ReceiveAuthResponseFromServer, combine them into one method.
  • In that method, send the request and place a new TaskCompletionSource into a queue that can be seen by the read loop
  • Return the task completion source's Task property as your result
  • In the read loop, when you read a message, dequeue the next completion source from the queue and use taskSource.SetResult(responseFromServer)

So something like this:

private readonly Queue<ResponseMessageType> _responseQueue = new Queue<ResponseMessageType>();

public async Task<bool> Authenticate(string clientId, string clientSecret) {
    var response = AsyncRequestAResponse(MakeAuthMessage(clientId, clientSecret));
    return (await response).Type == MessageTypes.AuthenticationResponse
}

public Task<bool> AsyncRequestAResponse(RequestMessageType request) {
    var responseSource = new TaskCompletionSource<ResponseMessageType>();
    _responseQueue.Enqueue(responseSource);
    Send(request);
    return responseSource.Task
}

private void Listen() {
    ...
    if (_responseQueue.Count == 0)
        throw new Exception("Erm, why are they responding before we requested anything?");
    _responseQueue.Dequeue().SetResult(msg);
}

In other words, use TaskCompletionSource<T> to translate from the network read/write stuff you do internally into the async stuff you expose to your callers.

It probably won't look exactly like above... I assumed there was an inherit ordering and one-to-one response/request in the example. You might have to match up request IDs with response IDs or something like that, and have timeouts that put exceptions instead of results into the queued tasks, and etc.

Also, if responses can be arriving concurrently with requests being sent, then it is important to place locks around the operations touching the queue.

Craig Gidney
  • 17,763
  • 5
  • 68
  • 136
  • 3
    There is a subtle moment here @AlekseyShubin should be aware of. If the thread where `await Task` was initiated is a non-UI thread with default synchronization context, calling `_responseQueue.Dequeue().SetResult(msg)` will immediately invoke the continuation callback (the code after `await`) on the thread where `Listen()` is operating. Thus, `Listen` will be blocked until there's another `await` in the code flow, on the consumer side. This potentially may lead to a deadlock, depending on the logic workflow. More info: http://stackoverflow.com/q/19481964/1768303 – noseratio Nov 05 '13 at 07:39
  • 1
    @Noseratio Right. Never ever block on a task, unless you understand how TPL works and how the task is fulfilled from top to bottom. – Craig Gidney Nov 05 '13 at 08:41