2

I have an interface:

public interface IRunner
{
    Task<bool> Run(object o);
}

and I have some classes that implement it using async, for example:

public class Runner1 : IRunner
{
    public async Task<bool> Run(object o)
    {
        var num  = await SomeExternalAsyncFunc(o);
        return num < 12;
    }
}

I need to implement a function that runs on all of the Runners classes in parallel, and returns true only if all of them returned true. After reading this and this I came up with the following implementation:

public class RunMannager
{
    public async Task<bool> Run(ConcurrentBag<IRunner> runnersBag, object o)
    {
        var results = new ConcurrentBag<bool>();
        var tasks = runnersBag.Select(async runner => results.Add(await runner.Run(o)));
        await Task.WhenAll(tasks);
        return results.All(result => result);
     }
}

However, I have two problems with this implementation:

  • I would like that if one of the runners already returned false, the function should not wait for all the others.

  • Some of the runners may never return, I would like to use a timeout. If the runner didn't return anything for 10 seconds it will be considered as true was returned.

Maybe using reactive extensions can help?

Alon
  • 353
  • 1
  • 10

2 Answers2

2

Here is a version with Rx, with timeouts included. The code as written will run in LINQPad with Rx references added. You can experiment with constructing TrueRunner, FalseRunner and SlowRunner instances in the RunnerFactory method.

Key ideas:

  • Use Select and ToObservable() to start and convert the async Task into an IObservable (see below for a better alternative)
  • Use Timeout() to add a timeout to each task, which substitutes a true result if a task times out, as requested.
  • Use Where to filter out True results - we only get notified about false outcomes now.
  • Any returns true if there are any elements in the stream, false otherwise, so flip the result of this in the subsequent Select and we are done.

Code:

void Main()
{
    Observable.Merge(
        RunnerFactory().Select(x => x.Run(null).ToObservable()
        .Timeout(TimeSpan.FromSeconds(1), Observable.Return(true))))
        .Where(res => !res)
        .Any().Select(res => !res)
        .Subscribe(
            res => Console.WriteLine("Result: " + res),
            ex => Console.WriteLine("Error: " + ex.Message));                                           
}

public IEnumerable<IRunner> RunnerFactory()
{
    yield return new FalseRunner();
    yield return new SlowRunner();
    yield return new TrueRunner();
}

public interface IRunner
{
    Task<bool> Run(object o);
}

public class Runner : IRunner
{
    protected bool _outcome;

    public Runner(bool outcome)
    {
        _outcome = outcome;
    }

    public virtual async Task<bool> Run(object o)
    {
        var result = await Task<bool>.Factory.StartNew(() => _outcome);     
        return result;
    }
}

public class TrueRunner : Runner
{
    public TrueRunner() : base(true) {}
}   

public class FalseRunner : Runner
{
    public FalseRunner() : base(false) {}
}   

public class SlowRunner : Runner
{
    public SlowRunner() : base(false) {}

    public override async Task<bool> Run(object o)
    {
        var result = await Task<bool>.Factory.StartNew(
            () => { Thread.Sleep(5000); return _outcome; });        
        return result;
    }
}   

The OnError handler in there is redundant given the Runner implementations I used; you may want to think about a Catch if you want to suppress Runner errors in your implementation - you can substitute an IObservable<bool> much like I did with Timeout.

EDIT Another thing I just thought worth mentioning, is that using Observable.StartAsync is a better way to start the Task, and will give you support for cancellation as well. Here's some modified code showing how SlowRunner can support cancellation. The token is passed in by StartAsync and causes cancellation if the Subscription is disposed. This all happens transparently if Any detects an element.

void Main()
{
    var runners = GetRunners();     

    Observable.Merge(runners.Select(r => Observable.StartAsync(ct => r.Run(ct, null))
                    .Timeout(TimeSpan.FromSeconds(10), Observable.Return(true))))
                    .Where(res => !res)
                    .Any().Select(res => !res)
                    .Subscribe(
                        res => Console.WriteLine("Result: " + res));
}

public static IEnumerable<IRunner> GetRunners()
{
    yield return new Runner(false);
    yield return new SlowRunner(true);
}

public interface IRunner
{
    Task<bool> Run(CancellationToken ct, object o);
}

public class Runner : IRunner
{
    protected bool _outcome;

    public Runner(bool outcome)
    {
        _outcome = outcome;
    }

    public async virtual Task<bool> Run(CancellationToken ct, object o)
    {
        var result = await Task<bool>.Factory.StartNew(() => _outcome);
        return result;
    }
}

public class SlowRunner : Runner
{
    public SlowRunner(bool outcome) : base(outcome)
    {
    }

    public async override Task<bool> Run(CancellationToken ct, object o)
    {
        var result = await Task<bool>.Factory.StartNew(() => 
        {
            for(int i=0; i<5; i++)
            {
                if(ct.IsCancellationRequested)
                {
                    Console.WriteLine("Cancelled");                     
                }
                ct.ThrowIfCancellationRequested();
                Thread.Sleep(1000);
            };
            return _outcome;
        });
        return result;
    }
}
James World
  • 29,019
  • 9
  • 86
  • 120
0

How about using Parallel.ForEach()? Following code should give you the idea of what I mean.

You could define CancellationTokenSource

CancellationTokenSource cancellationToken = new CancellationTokenSource();
ParallelOptions po = new ParallelOptions();
po.CancellationToken = cancellationToken.Token;

Then pass po to Parallel.ForEach

Parallel.ForEach(items, po, item =>
{
   //...
   if(num < 12)
     cancellationToken.Cancel(false);

});

return !cancellationToken.IsCancellationRequested;
VladL
  • 12,769
  • 10
  • 63
  • 83
  • according to [this](http://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach ) Parallel.ForEach with async finishes before all the async tasks return – Alon Oct 19 '13 at 10:32