2

In my .NET 4.0 library I have a piece of code that sends data over the network and waits for a response. In order to not block the calling code the method returns a Task<T> that completes when the response is received so that the code can call the method like this:

// Send the 'message' to the given 'endpoint' and then wait for the response
Task<IResult> task = sender.SendMessageAndWaitForResponse(endpoint, message);
task.ContinueWith(
    t => 
    {
        // Do something with t.Result ...
    });

The underlying code uses a TaskCompletionSource so that it can wait for the response message without having to spin up a thread only to have it sit there idling until the response comes in:

private readonly Dictionary<int, TaskCompletionSource<IResult>> m_TaskSources
    = new Dictionary<int, TaskCompletionSource<IResult>>();

public Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message)
{
    var source = new TaskCompletionSource<IResult>(TaskCreationOptions.None);
    m_TaskSources.Add(endpoint, source);

    // Send the message here ...

    return source.Task;
}

When the response is received it is processed like this:

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    if (m_TaskSources.ContainsKey(endpoint))
    {
        var source = m_TaskSources[endpoint];
        source.SetResult(value);
        m_TaskSources.Remove(endpoint);
    }
}

Now I want to add a time-out so that the calling code won't wait indefinitely for the response. However on .NET 4.0 that is somewhat messy because there is no easy way to time-out a task. So I was wondering if Rx would be able to do this easier. So I came up with the following:

private readonly Dictionary<int, Subject<IResult>> m_SubjectSources
    = new Dictionary<int, Subject<IResult>>();

private Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message, TimeSpan timeout)
{
    var source = new Subject<IResult>();
    m_SubjectSources.Add(endpoint, source);

    // Send the message here ...

    return source.Timeout(timeout).ToTask();
}

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    if (m_SubjectSources.ContainsKey(endpoint))
    {
        var source = m_SubjectSources[endpoint];
        source.OnNext(value);
        source.OnCompleted();
        m_SubjectSources.Remove(endpoint);
    }
}

This all seems to work without issue, however I've seen several questions stating that Subject should be avoided so now I'm wondering if there is a more Rx-y way to achieve my goal.

Community
  • 1
  • 1
Petrik
  • 1,961
  • 13
  • 23
  • 1
    Are you developing with VS2010? Otherwise, you still can target .NET 4.0 and use [`Microsoft.Bcl.Async`](http://www.nuget.org/packages/microsoft.bcl.async), it has `TaskEx.Delay`, `CancellationTokenSource.CancelAfter` etc. – noseratio Apr 03 '14 at 09:46
  • 1
    Nope I'm on VS2012. Thanks for the pointer to the BCL library. I didn't know about that one. – Petrik Apr 03 '14 at 18:51
  • 1
    No worries, you may also want to read this: http://blogs.msdn.com/b/pfxteam/archive/2012/10/05/how-do-i-cancel-non-cancelable-async-operations.aspx – noseratio Apr 03 '14 at 19:15

1 Answers1

3

The advice to avoid using Subject in Rx is often overstated. There has to be a source for events in Rx, and it's fine for it to be a Subject.

The issue with Subject is generally when it is used in between two Rx queries that could otherwise be joined, or where there is already a well-defined conversion to IObservable<T> (such as Observable.FromEventXXX or Observable.FromAsyncXXX etc.

If you want, you can do away with the Dictionary and multiple Subjects with the approach below. This uses a single subject and returns a filtered query to the client.

It's not "better" per se, Whether this makes sense will depend on the specifics of your scenario, but it saves spawning lots of subjects, and gives you a nice option for monitoring all results in a single stream. If you were dispatching results serially (say from a message queue) this could make sense.

// you only need to synchronize if you are receiving results in parallel
private readonly ISubject<Tuple<int,IResult>, Tuple<int,IResult>> results =
    Subject.Synchronize(new Subject<Tuple<int,IResult>>());

private Task<IResult> SendMessageAndWaitForResponse(
    int endpoint, object message, TimeSpan timeout)
{           
    // your message processing here, I'm just echoing a second later
    Task.Delay(TimeSpan.FromSeconds(1)).ContinueWith(t => {
        CompleteWaitForResponseResponse(endpoint, new Result { Value = message }); 
    });

    return results.Where(r => r.Item1 == endpoint)
                  .Select(r => r.Item2)
                  .Take(1)
                  .Timeout(timeout)
                  .ToTask();
}

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    results.OnNext(Tuple.Create(endpoint,value));
}

Where I defined a class for results like this:

public class Result : IResult
{
    public object Value { get; set; }
}

public interface IResult
{
    object Value { get; set; }
}

EDIT - In response to additional questions in the comments.

  • No need to dispose of the single Subject - it won't leak and will be garbage collected when it goes out of scope.

  • ToTask does accept a cancellation token - but that's really for cancellation from the client side.

  • If the remote side disconnects, you can send an the error to all clients with results.OnError(exception); - you'll want to instantiate a new subject instance at the same time.

Something like:

private void OnRemoteError(Exception e)
{
    results.OnError(e);        
}

This will manifest as a faulted task to all clients in the expected manner.

It's pretty thread safe too because clients subscribing to a subject that has previously sent OnError will get an error back immediately - it's dead from that point. Then when ready you can reinitialise with:

private void OnInitialiseConnection()
{
    // ... your connection logic

    // reinitialise the subject...
    results = Subject.Synchronize(new Subject<Tuple<int,IResult>>());
}

For individual client errors, you could consider:

  • Extending your IResult interface to include errors as data
  • You can then optionally project this to a fault for just that client by extending the Rx query in SendMessageAndWaitForResponse. For example, and an Exception and HasError property to IResult so that you can do something like:

    return results.Where(r => r.Item1 == endpoint)
                .SelectMany(r => r.Item2.HasError
                    ? Observable.Throw<IResult>(r.Item2.Exception)
                    : Observable.Return(r.Item2))
                .Take(1)
                .Timeout(timeout)
                .ToTask();
    
James World
  • 29,019
  • 9
  • 86
  • 120
  • The point is, `Task.Delay` is not available in .NET 4.0. That's why I asked about `Microsoft.Bcl.Async`. Even if that's not an option, it's still trivial to implement via `System.Threading.Timer`, so I wonder what's holding @Petrik on. – noseratio Apr 03 '14 at 10:02
  • `Task.Delay` is not part of the implementation!!! That's just a hack to simulate a response. Once you are in `IObservable`, options open up that aren't available with Task. Whether you need/want them is another question. The OP was asking specifically about Subjects in Rx. – James World Apr 03 '14 at 10:12
  • @Noseratio I could certainly use `System.Threading.Timer` or an equivalent, however that adds complexity on top of what I need and I don't trust my own skills too much with all this threading / synchronization. It seems better to use Rx or TPL where other people have solved some of those problems for me :) – Petrik Apr 03 '14 at 20:05
  • @JamesWorld Thanks for that solution, it is a lot cleaner than my approach. Two (somewhat related) questions I have are how do I deal with the cancellation of the task, e.g. if the remote side disconnects I want to cancel all the outstanding tasks for that side. Currently I have a `CancellationToken` that I use for that but I'm not sure how to fit that into your approach. Secondly do I have to dispose of anything or is that done when the task completes? I'm assuming I have to eventually dispose of the main ISubject instance? – Petrik Apr 03 '14 at 20:08
  • Added some more thoughts on individual client errors. – James World Apr 03 '14 at 21:10