2

I have some software with an event-based networking protocol for control which is using a IObservable<Event> for handling in bound messages.

In many cases a sent message will expect a specific response (or sequence, such as to report progress). In order to not potentially miss the response a task is being set up in advance with FirstAsync and ToTask, however these appear to leak if the task never completes.

It is also not allowed to simply put evtTask in a using block as trying to dispose the incomplete task is not allowed.

var jobUuid = Guid.NewGuid();
var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid).ToTask();
// e.g. if this throws without ever sending the message
await SendMessage($"job {jobUuid} download {url}");
var evt = await evtTask;
if (evt.Success)
{
    ...
}

Does the library provide a simple means for this use-case that will unsubscribe on leaving the scope?

var jobUuid = Guid.NewGuid();
using(var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid)
    .ToDisposableTask())) // Some method like this
{
    // e.g. if this throws without ever sending the message
    await SendMessage($"job {jobUuid} download {url}");
    var evt = await evtTask;
    if (evt.Success)
    {
        ...
    }
} // Get rid of the FirstAsync task if leave here before it completes for any reason
Fire Lancer
  • 29,364
  • 31
  • 116
  • 182
  • So what exactly leaks and how do you know it does? – Evk Mar 14 '18 at 16:07
  • The subscription to the observer will not get unsubscribed (as FirstAsync does on the first result). The lambda will still be getting called (even though its never going to meet that GUID check) and for e.g. `Subject` can look at the stuff in `_observer` field. – Fire Lancer Mar 14 '18 at 16:11
  • So what exactly do you try to achieve ? We need more information than that – Fabjan Mar 14 '18 at 16:12
  • More than the example code? For it to handle things like `SendMessage` throwing, but the `Task` can't go in a `using` block as mentioned. – Fire Lancer Mar 14 '18 at 16:13
  • You can add [Timeout](https://stackoverflow.com/questions/4238345/asynchronously-wait-for-taskt-to-complete-with-timeout) for task so it will throw something like `TimeOutException` after a period of time... – Fabjan Mar 14 '18 at 16:55
  • I guess like a Timeout of 15 minutes or something would be an idea, but potentially many could stack up. The `using` block seems the right tool, some sort of `ToDisposableTask()` that unsubscribes itself? – Fire Lancer Mar 14 '18 at 16:58
  • @FireLancer Even though `Task` implements `IDisposable` it's very unlikely that you would need to dispose the task explicitly. If yes, it's mostly to dispose the underlying `WaitHandle` object used by `Task`. In [this](https://blogs.msdn.microsoft.com/pfxteam/2012/03/25/do-i-need-to-dispose-of-tasks/) article from MSDN, I quote: "In reality, very few tasks actually have their WaitHandle allocated...". One possible scenario is when we are working with old APM (asynchronous programming model) and using new .Net framework version.. – Fabjan Mar 14 '18 at 17:13
  • That is what I meant by putting a normal task with using is (almost) never helpful. It actually just throws a `InvalidOperationException` if try to Dispose before completion. – Fire Lancer Mar 15 '18 at 10:03

2 Answers2

2

Disposing Task will not help, since it does nothing useful (in most situations, including this one). What will help though is cancelling task. Cancelling disposes underlying subscription created by ToTask and so, resolves this "leak".

So it can go like this:

Task<Event> evtTask;
using (var cts = new CancellationTokenSource()) {
    evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid)
             .ToTask(cts.Token);
    // e.g. if this throws without ever sending the message
    try {
        await SendMessage($"job {jobUuid} download {url}");
    }
    catch {
        cts.Cancel(); // disposes subscription
        throw;
    }
}
var evt = await evtTask;
if (evt.Success)
{
    ...
}

Of course you can wrap that in some more convenient form (like extension method). For example:

public static class ObservableExtensions {
    public static CancellableTaskWrapper<T> ToCancellableTask<T>(this IObservable<T> source) {
        return new CancellableTaskWrapper<T>(source);
    }

    public class CancellableTaskWrapper<T> : IDisposable
    {
        private readonly CancellationTokenSource _cts;
        public CancellableTaskWrapper(IObservable<T> source)
        {
            _cts = new CancellationTokenSource();
            Task = source.ToTask(_cts.Token);
        }

        public Task<T> Task { get; }

        public void Dispose()
        {
            _cts.Cancel();
            _cts.Dispose();
        }
    }
}

Then it becomes close to what you want:

var jobUuid = Guid.NewGuid();
using (var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid).ToCancellableTask()) {
    await SendMessage($"job {jobUuid} download {url}");
    var evt = await evtTask.Task;
    if (evt.Success) {
        ...
    }
}
Evk
  • 98,527
  • 8
  • 141
  • 191
1

You can either use TPL Timeout (as referenced by @Fabjan), or the Rx/System.Reactive version of Timeout.

using sounds nice, but doesn't make sense. Using is the equivalent of calling .Dispose on something at the end of the using block. The problem here, I'm assuming, is that your code never gets past await evtTask. Throwing all of that in a hypothetical using wouldn't change anything: Your code is still waiting forever.

At a higher level, your code is more imperative than reactive, you may want to refactor it to something like this:

var subscription = Events
    .Where(x => x.Action == Action.JobComplete)
    .Subscribe(x => 
    {
        if(x.Success)
        {
            //...
        }
        else
        {
            //...
        }
    });
Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • Getting stuck on the await either can't happen or indeed can be handled by a timeout or such if it reached that far, was speaking more of the thing just before it throwing an exception for example. I did try using `Subscribe`, but then I need a `TaskCompleteSource` and such to wait on with that `subscription` there in a using, because these are single responses (think like HTTP request-response), not "push events" saying stuff happened on the server. – Fire Lancer Mar 15 '18 at 10:00
  • For single request you can add a `Take(1)` or `FirstAsync` as you have. Single responses are singular push events. Aggregate them together and you have an observable stream. It's just a matter of perspective. – Shlomo Mar 15 '18 at 16:28