2

I have a class that is responsible for retrieving a product availability by making call to a legacy class. This legacy class itself internally collects product data by making BLOCKING network calls. Note that I cannot modify code of legacy API. Since all products are independent to each other, I would like to parallelise collecting the information without creating any unnecessary threads and also not blocking thread that gets blocked on calling this legacy API. With this background here are my basic classes.

class Product
    {
        public int ID { get; set; }
        public int  VendorID { get; set; }
        public string Name { get; set; }
    }

    class ProductSearchResult
    {
        public int ID { get; set; }
        public int AvailableQuantity { get; set; }
        public DateTime ShipDate { get; set; }
        public bool Success { get; set; }
        public string Error { get; set; }
    }

class ProductProcessor
    {
        List<Product> products;
        private static readonly SemaphoreSlim mutex = new SemaphoreSlim(2);
        CancellationTokenSource cts = new CancellationTokenSource();
        public ProductProcessor()
        {
            products = new List<Product>()
            {
                new Product() { ID = 1, VendorID = 100, Name = "PC" },
                new Product() { ID = 2, VendorID = 101, Name = "Tablet" },
                new Product() { ID = 3, VendorID = 100, Name = "Laptop" },
                new Product() { ID = 4, VendorID = 102, Name = "GPS" },
                new Product() { ID = 5, VendorID = 107, Name = "Mars Rover" }
            };

        }

        public async void Start()
        {
            Task<ProductSearchResult>[] tasks = new Task<ProductSearchResult>[products.Count];
            Parallel.For(0, products.Count(), async i =>
            {
                tasks[i] = RetrieveProductAvailablity(products[i].ID, cts.Token);

            });



            Task<ProductSearchResult> results = await Task.WhenAny(tasks);

            // Logic for waiting on indiviaul tasks and reporting results

        }

        private async Task<ProductSearchResult> RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
        {
            ProductSearchResult result = new ProductSearchResult();
            result.ID = productId;

            if (cancellationToken.IsCancellationRequested)
            {
                result.Success = false;
                result.Error = "Cancelled.";
                return result;
            }

            try
            {
                await mutex.WaitAsync();
                if (cancellationToken.IsCancellationRequested)
                {
                    result.Success = false;
                    result.Error = "Cancelled.";
                    return result;
                }

                LegacyApp app = new LegacyApp();
                bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));
                if (success)
                {
                    result.Success = success;
                    result.AvailableQuantity = app.AvailableQuantity;
                    result.ShipDate = app.ShipDate;
                }
                else
                {
                    result.Success = false;
                    result.Error = app.Error;
                }
            }
            finally
            {
                mutex.Release();
            }

            return result;

        }

    }

Given that I am trying to wrap async over a synchronous API, I have two questions.

  1. With use of Parallel.For and wrapping Legay API call within a Task.Run, am I creating any unnecessary threads that could have been avoided without blocking calling thread as we will use this code in UI.
  2. Is this code still look thread safe.
Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
whoami
  • 1,689
  • 3
  • 22
  • 45

2 Answers2

5

The compiler will give you warnings about your async lambda. Read it carefully; it's telling you that it's not asynchronous. There's no point in using async there. Also, do not use async void.

Since your underlying API is blocking - and there's no way to change that - asynchronous code isn't an option. I'd recommend either using several Task.Run calls or Parallel.For, but not both. So let's use parallel. Actually, let's use Parallel LINQ since you're transforming a sequence.

There's no point in making RetrieveProductAvailablity asynchronous; it's only doing blocking work except for the throttling, and the parallel approach has more natural throttling support. This leaves your method looking like:

private ProductSearchResult RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
{
  ... // no mutex code
  LegacyApp app = new LegacyApp();
  bool success = app.RetrieveProductAvailability(productId);
  ... // no mutex code
}

You can then do parallel processing as such:

public void Start()
{
  ProductSearchResult[] results = products.AsParallel().AsOrdered()
      .WithCancellation(cts.Token).WithDegreeOfParallelism(2)
      .Select(product => RetrieveProductAvailability(product.ID, cts.Token))
      .ToArray();
  // Logic for waiting on indiviaul tasks and reporting results
}

From your UI thread, you can call the method using Task.Run:

async void MyUiEventHandler(...)
{
  await Task.Run(() => processor.Start());
}

This keeps your business logic clean (only synchronous/parallel code), and the responsibility for moving this work off the UI thread (using Task.Run) belongs to the UI layer.

Update: I added a call to AsOrdered to ensure the results array has the same order as the products sequence. This may or may not be necessary, but since the original code preserved order, this code does now too.

Update: Since you need to update the UI after every retrieval, you should probably use Task.Run for each one instead of AsParallel:

public async Task Start()
{
  var tasks = products.Select(product =>
      ProcessAvailabilityAsync(product.ID, cts.Token));
  await Task.WhenAll(tasks);
}

private SemaphoreSlim mutex = new SempahoreSlim(2);
private async Task ProcessAvailabilityAsync(int id, CancellationToken token)
{
  await mutex.WaitAsync();
  try
  {
    var result = await RetrieveProductAvailability(id, token);
    // Logic for reporting results
  }
  finally
  {
    mutex.Release();
  }
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • What's the difference between use of AsParallel vs Parallel.For in this case? – whoami Jan 21 '15 at 16:19
  • 1
    @johnsmith Look at the documentation for each method to see what they can do, and how they can be used. – Servy Jan 21 '15 at 16:24
  • 1
    @johnsmith: `Parallel` and Parallel LINQ are very similar, but one is slightly better than the other in different scenarios. `Parallel` is "nicer" in terms of CPU usage/over-parallelism, whereas Parallel LINQ is easier to use with sequence operations (e.g., `Select`) and aggregation (e.g., `Average`). There's a great comparison writeup [here](http://blogs.msdn.com/b/pfxteam/archive/2010/04/21/when-to-use-parallel-foreach-and-when-to-use-plinq.aspx). – Stephen Cleary Jan 21 '15 at 16:27
  • @StephenCleary Just to be sure I am clear with your proposal, the logic for waiting on individual task and reporting results will be moved to MyUIEventHandler method. Is this correct? You have comment in Start method so just want to be sure I am not taking anything wrong here? – whoami Jan 21 '15 at 20:11
  • @johnsmith: If "reporting results" interacts with the UI, then yes, that would be the most appropriate. In that case, `Start` should probably return `ProductSearchResult[]`. – Stephen Cleary Jan 21 '15 at 20:19
  • @StephenCleary In that case, should Start method return some sort of Task[] because as each Task completes, I can show progress in UI etc? – whoami Jan 21 '15 at 20:22
  • @johnsmith: If you need to update the UI as each task completes, then you're probably better off using `Task.Run` instead of `AsParallel`. – Stephen Cleary Jan 21 '15 at 21:06
2

am I creating any unnecessary threads that could have been avoided without blocking calling thread as we will use this code in UI.

Yes. Your code spins new threads via Parallel.ForEach, and then again internally inside RetrieveProductAvailablity. There is no need for that.

async-await and Parallel.ForEach don't really play nice together, as it converts your async lambda into an async void method instead of async Task.

What i would recommend is to drop the Parallel.ForEach and the wrapped sync call and do that following:

Change your method call from async to sync (as it really isn't async at all):

private ProductSearchResult RetrieveProductAvailablity(int productId,
                                                       CancellationToken
                                                       cancellationToken)

Instead of this:

bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));

Invoke the method call synchronously:

bool success = app.RetrieveProductAvailability(productId));

And then explicitly invoke Task.Run on all of them:

var productTasks = products.Select(product => Task.Run(() => 
                                   RetrieveProductAvailablity(product.ID, cts.Token))

await Task.WhenAll(productTasks);

Generally, it's not recommended to expose async wrappers over sync methods

Community
  • 1
  • 1
Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • So what's approach you suggest if I want to have a response UI for this scenario? Can I use regular for loop in place of Parallel.For while making it thread safe? – whoami Jan 21 '15 at 16:08