0

I have just started working with tasks. We have a system setup that uses requests/responses. The service running the tasks accepts a master request that has a list of request objects and returns a master response that has a list of response objects. So it looks something like this

    var MasterRequest = new MasterRequest;
    MasterRequest.Requests.Add(new BlueRequest);
    MasterRequest.Requests.Add(new RedRequest);
    MasterRequest.Requests.Add(new YellowRequest);

The request implements a simple IRequest interface and each color is a concrete class. The service has concrete classes (request processors) set up to be able to process each request separately and simultaneously according to a concrete request object. Each concrete class on the service has a GetTask method with a signature like this:

    Task<IResponse> GetTask(IRequest);
    {
       // some setup stuff
       return Task.Factory.StartNew<IResponse>(() =>
        {
           // do task stuff
           return response; // implements IResponse
         });            
    }

My service takes the passed in MasterRequest and builds a list of tasks by calling the GetTask call listed above on the concrete request processors. I then use a Parallel.ForEach on the list to process the tasks.

    // this is what is returned from the service.
    // it has a List<IResponse> on it to hold  the resposnes
    MasterResposne resposne = new MasterResponse();

    List<Task<IResponse>> tasks = new List<Task<IResponse>>();

    foreach(IRequest req in MasterRequest.Requests)
    {
        // factory to get the proper request processor
        RequestProcessor p  = rp.GetProcessor(req);

        tasks.add(p.GetTask(req));
     }

     Parallel.ForEach(tasks, t =>
        {
             t.Wait();

              // check for faulted and cancelled 
              // this is where I need help

              response.Responses.Add(t.Result);
         }

This all works great. But if the task throws an exception I don't know how to tie it back to the specific concrete request that triggered it. I need to know so I can pass back a properly built response to the caller.

My first thought was to subclass Task but that brings up it's own set of issues that I don't want to deal with.

I read this SO article and it seems like I want to do something like this

Is this ok to derive from TPL Task to return more details from method?

I think Reed's second example is my solution but I still cannot see how to run the tasks simultaneously and be able to tie exceptions to the request so I can return a properly built list of responses.

Thanks in advance.

brian
  • 37
  • 5
  • 1) I think the recommended way to start a Task is Task.Run() 2) You can dig into TaskCompletionSource. Here some links that helped me a lot: https://msdn.microsoft.com/en-us/magazine/jj991977.aspx https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/interop-with-other-asynchronous-patterns-and-types#EAP https://scalablenotions.wordpress.com/2015/05/02/tpl-and-async-await-best-practices-for-the-busy-developer/ – Fildor Aug 08 '17 at 14:30
  • 5
    Maybe Task.WaitAll() will be better than Parallel? – ASpirin Aug 08 '17 at 14:35
  • @Fildor that is a good link. I have some other requirements that force me to have a custom task scheduler so I am forced to use the Task.Factory.StarNew call. – brian Aug 08 '17 at 16:14

2 Answers2

0

So I was able to use Reed's solution from the link I supplied. My service code to process the requests turned into this

// this is what is returned from the service.
// it has a List<IResponse> on it to hold  the resposnes
MasterResposne resposne = new MasterResponse();

List<ExecutionResult> tasks = new List<ExecutionResult>();

foreach(IRequest req in MasterRequest.Requests)
{
    // factory to get the proper request processor
    RequestProcessor p  = rp.GetProcessor(req);

    tasks.add(p.GetResult(req));
 }

 Parallel.ForEach(tasks, t =>
    {
         t.task.Wait();

          response.Responses.Add(t.Result);
     }

Where ExecutionResult is defined like so

    class ExecutionResult
    {
       public IResult Result;
       public Task<IResponse> task;

    }

That gives me access to a pre-built response object so I can pass it back to the caller.

EDIT: So I reviewed my Parallel.ForEach and was able to redo my code and use await Task.WhenAll as suggested. New code looks more like this:

    // this is what is returned from the service.
    // it has a List<IResponse> on it to hold  the resposnes
    MasterResposne resposne = new MasterResponse();

    List<ExecutionResult> tasks = new List<ExecutionResult>();
    List<ExecutionResult> executionResults = new List<ExecutionResult>();

    foreach(IRequest req in MasterRequest.Requests)
    {
        // factory to get the proper request processor
        RequestProcessor p  = rp.GetProcessor(req);

        ExecutionResult er = engine.GetResult(req);
        executionResults.Add(er);
        tasks.Add(er.Task);
     }

      await Task.WhenAll<IResponse>(tasks);

      foreach (ExecutionResult r in executionResults)
      {
          if (r.Task.IsCompleted)
          {
              response.AddResponse(r.Task.Result);
          }
          else
          {
              r.Response.Status = false;
              AggregateException flat = r.Task.Exception.Flatten();

              foreach (Exception e in flat.InnerExceptions)
              {
                  Log.ErrorFormat("Reqest [{0}] threw [{1}]", r.Response.RequestId, e);
                  r.Response.StatusReason.AppendLine(e.Message);
              }
          }
      }

This allows me to tie my request information to my task and get the response back that I need to return to my caller.

Thanks for the guidance.

brian
  • 37
  • 5
  • using `Task.Wait()` for several tasks in combination with `Parallel.ForEach` is the strangest thing I've encountered so far regarding to waiting for Tasks to complete. Is there a very good reason not to use `await Task.WhenAll()`? – Peter Bons Aug 08 '17 at 16:54
  • Also, partially answering your own question is not very useful, can you update the original question instead, so a new person taking a look at this question has a single point of view for the question? – Peter Bons Aug 08 '17 at 16:55
  • @PeterBons What makes you think this is only a partial answer, and not a finished answer? – Servy Aug 08 '17 at 16:58
  • @PeterBons The use of `Parallel` here actually *breaks* the code, because the `response` is unsafely accessed from multiple threads. It should just be a regular `foreach`. – Servy Aug 08 '17 at 16:59
  • @Servy I think it is partial because the comments in the code of the answer states he still needs help to check for faulted and cancelled tasks. The question title itself seems to refer to those comments. – Peter Bons Aug 08 '17 at 17:01
  • @PeterBons I edited my solution and removed my "I need help here" comment. – brian Aug 08 '17 at 18:12
  • @Servy the response object is a ConcurrentBag so I can dump values in there from multiple thread. – brian Aug 08 '17 at 18:13
  • @brian So then if you really do do it right then at best your code is just slower and more complex, for nothing in return. – Servy Aug 08 '17 at 19:49
  • @Servy I changed the underlying to a List instead of the ConcurrentBag since I changed the code to a Foreach instead of the previous Parallel.ForEach. That should speed things up a little. Thanks. – brian Aug 08 '17 at 21:13
0

I then use a Parallel.ForEach on the list to process the tasks.

This is actually pretty bad. It's throwing a ton of threads into the mix just to block on the tasks completing.

But if the task throws an exception I don't know how to tie it back to the specific concrete request that triggered it. I need to know so I can pass back a properly built response to the caller.

Whenever you have a "process tasks after they complete" kind of problem, usually the best solution is a higher-level asynchronous operation:

private async Task<IResponse> ProcessAsync(IRequest request)
{
  try
  {
    return await engine.GetResult(request);
  }
  catch (Exception ex)
  {
    IResponse result = /* create error response */;
    return result;
  }
}

This allows a much simpler main function:

MasterResposne resposne = new MasterResponse();

var tasks = MasterRequest.Requests.Select(req => ProcessAsync(req));
response.AddRange(await Task.WhenAll(tasks));
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810