Here is a version with Rx, with timeouts included. The code as written will run in LINQPad with Rx references added. You can experiment with constructing TrueRunner, FalseRunner and SlowRunner instances in the RunnerFactory method.
Key ideas:
- Use
Select
and ToObservable()
to start and convert the async Task into an IObservable (see below for a better alternative)
- Use
Timeout()
to add a timeout to each task, which substitutes a true result if a task times out, as requested.
- Use
Where
to filter out True
results - we only get notified about false outcomes now.
Any
returns true if there are any elements in the stream, false otherwise, so flip the result of this in the subsequent Select
and we are done.
Code:
void Main()
{
Observable.Merge(
RunnerFactory().Select(x => x.Run(null).ToObservable()
.Timeout(TimeSpan.FromSeconds(1), Observable.Return(true))))
.Where(res => !res)
.Any().Select(res => !res)
.Subscribe(
res => Console.WriteLine("Result: " + res),
ex => Console.WriteLine("Error: " + ex.Message));
}
public IEnumerable<IRunner> RunnerFactory()
{
yield return new FalseRunner();
yield return new SlowRunner();
yield return new TrueRunner();
}
public interface IRunner
{
Task<bool> Run(object o);
}
public class Runner : IRunner
{
protected bool _outcome;
public Runner(bool outcome)
{
_outcome = outcome;
}
public virtual async Task<bool> Run(object o)
{
var result = await Task<bool>.Factory.StartNew(() => _outcome);
return result;
}
}
public class TrueRunner : Runner
{
public TrueRunner() : base(true) {}
}
public class FalseRunner : Runner
{
public FalseRunner() : base(false) {}
}
public class SlowRunner : Runner
{
public SlowRunner() : base(false) {}
public override async Task<bool> Run(object o)
{
var result = await Task<bool>.Factory.StartNew(
() => { Thread.Sleep(5000); return _outcome; });
return result;
}
}
The OnError handler in there is redundant given the Runner implementations I used; you may want to think about a Catch if you want to suppress Runner errors in your implementation - you can substitute an IObservable<bool>
much like I did with Timeout.
EDIT Another thing I just thought worth mentioning, is that using Observable.StartAsync
is a better way to start the Task, and will give you support for cancellation as well. Here's some modified code showing how SlowRunner can support cancellation. The token is passed in by StartAsync
and causes cancellation if the Subscription is disposed. This all happens transparently if Any
detects an element.
void Main()
{
var runners = GetRunners();
Observable.Merge(runners.Select(r => Observable.StartAsync(ct => r.Run(ct, null))
.Timeout(TimeSpan.FromSeconds(10), Observable.Return(true))))
.Where(res => !res)
.Any().Select(res => !res)
.Subscribe(
res => Console.WriteLine("Result: " + res));
}
public static IEnumerable<IRunner> GetRunners()
{
yield return new Runner(false);
yield return new SlowRunner(true);
}
public interface IRunner
{
Task<bool> Run(CancellationToken ct, object o);
}
public class Runner : IRunner
{
protected bool _outcome;
public Runner(bool outcome)
{
_outcome = outcome;
}
public async virtual Task<bool> Run(CancellationToken ct, object o)
{
var result = await Task<bool>.Factory.StartNew(() => _outcome);
return result;
}
}
public class SlowRunner : Runner
{
public SlowRunner(bool outcome) : base(outcome)
{
}
public async override Task<bool> Run(CancellationToken ct, object o)
{
var result = await Task<bool>.Factory.StartNew(() =>
{
for(int i=0; i<5; i++)
{
if(ct.IsCancellationRequested)
{
Console.WriteLine("Cancelled");
}
ct.ThrowIfCancellationRequested();
Thread.Sleep(1000);
};
return _outcome;
});
return result;
}
}