3

I'm now at the point where i need to decide how to build my async code. I need from my function Func1() to call 20 different web services at once, and when they all return an xml answer, join all the results to one big xml.

I thought about using TPL tasks. something like this:

var task = Task.Factory.StartNew (call the web service1...);
var task2 = Task.Factory.StartNew (call the web service2...);
var task3 = Task.Factory.StartNew (call the web service3...);

task.WaitAll();

Does it sound good or there is a better way to get the job done?

svick
  • 236,525
  • 50
  • 385
  • 514
Itay.B
  • 3,991
  • 14
  • 61
  • 96
  • 3
    yes there is a better way - make the calls async - right now you do them parallel but block up to 3(or 20?) threads while waiting for the result - if you make async-calls you won't block any – Random Dev Jan 16 '13 at 20:37
  • 1
    I wouldn't say `async` is necessarily *better*, rather, it's different, and its superiority will depend on the circumstances. In this case, "call the web service" is presumably a rather light operation, so the overhead of a thread (`Task.Factory.StartNew`) probably isn't justified. – Snixtor Jan 16 '13 at 22:07
  • As per question, combination of xml should be as output, so call has to wait till every request should complete. At the max this all process can pushed to backgroud thread. As it is already parallel nothing to do for that part. I don't think full asyc will work here. – kunjee Jan 24 '13 at 07:43

4 Answers4

3

We needed something like this a couple of months ago, to process multiple remote URL's concurrently. We implemented this by deriving our own class from the SemaphoreSlim class.

You could implement something like this:

/// <summary>
    /// Can be used to process multiple URL's concurrently.
    /// </summary>
    public class ConcurrentUrlProcessor : SemaphoreSlim
    {
        private int initialCount;
        private int maxCount;
        private readonly HttpClient httpClient;

        /// <summary>
        /// Initializes a new instance of the <see cref="T:System.Threading.SemaphoreSlim" /> class, specifying the initial number of requests that can be granted concurrently.
        /// </summary>
        /// <param name="initialCount">The initial number of requests for the semaphore that can be granted concurrently.</param>
        public ConcurrentUrlProcessor(int initialCount)
            :base(initialCount)
        {
            this.initialCount = initialCount;
            this.maxCount = int.MaxValue;
            this.httpClient = new HttpClient();
        }
        /// <summary>
        /// Initializes a new instance of the <see cref="T:System.Threading.SemaphoreSlim" /> class, specifying the initial and maximum number of requests that can be granted concurrently.
        /// </summary>
        /// <param name="initialCount">The initial number of requests for the semaphore that can be granted concurrently.</param>
        /// <param name="maxCount">The maximum number of requests for the semaphore that can be granted concurrently.</param>
        public ConcurrentUrlProcessor(int initialCount, int maxCount)
            : base(initialCount, maxCount) 
        {
            this.initialCount = initialCount;
            this.maxCount = maxCount;
            this.httpClient = new HttpClient();
        }
        /// <summary>
        /// Starts the processing.
        /// </summary>
        /// <param name="urls">The urls.</param>
        /// <returns>Task{IEnumerable{XDocument}}.</returns>
        public virtual async Task<IEnumerable<XDocument>> StartProcessing(params string[] urls)
        {
            List<Task> tasks = new List<Task>();
            List<XDocument> documents = new List<XDocument>();

            SemaphoreSlim throttler = new SemaphoreSlim(initialCount, maxCount);
            foreach (string url in urls)
            {
                await throttler.WaitAsync();

                tasks.Add(Task.Run(async () =>
                {
                    try
                    {
                        string xml = await this.httpClient.GetStringAsync(url);

                        //move on to the next page if no xml is returned.
                        if (string.IsNullOrWhiteSpace(xml))
                            return;

                        var document = XDocument.Parse(xml);
                        documents.Add(document);
                    }
                    catch (Exception)
                    {
                        //TODO: log the error or do something with it.
                    }
                    finally
                    {
                        throttler.Release();
                    }
                }));
            }

            await Task.WhenAll(tasks);

            return documents;
        }
    }

And the according unit test:

    [Test]
    public async void CanProcessMultipleUrlsTest()
    {
        string[] urls = new string[] 
        {
            "http://google.nl",
            "http://facebook.com",
            "http://linkedin.com",
            "http://twitter.com" 
        };

        IEnumerable<XDocument> documents = null;
        ConcurrentUrlProcessor processor = new ConcurrentUrlProcessor(100);

        documents = await processor.StartProcessing(urls);

        Assert.AreEqual(4, documents.Count());
    }
Rob Angelier
  • 2,335
  • 16
  • 29
2

There are two approcahes that come to my mind.

A. The way you are doing it now but using continuations ContinueWith/ContinueWhenAll as described in this answer and in this articale. So for your case you might use a single continuation using child tasks, so

TaskCreationoptions op = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew(() => 
{
    var task1 = Task.Factory.StartNew (CallService(1));
    var task2 = Task.Factory.StartNew (CallService(2));
    var task3 = Task.Factory.StartNew (CallService(3));
})
.ContinueWith(ant => { SomeOtherselegate });

Or you could chain the continuations as explained here.

Another way is to use ContinueWhenAll.

var task1 = Task.Factory.StartNew (CallService(1));
var task2 = Task.Factory.StartNew (CallService(2));
var task3 = Task.Factory.StartNew (CallService(3));
var continuation = Task.Factory.ContinueWhenAll(
    new[] { task1, task2, task3 }, tasks => Console.WriteLine("Done!"));

The only thing to think about here is the way you can have a variable number of tasks, but this is easy and I will let you work that one out.

B. The other way is to use .NET4.5+ and async/await. So your code would be something like

private async void CallAllServicesAsync()
{
    await CallServiceAsync(1);
    await CallServiceAsync(2);
    await CallServiceAsync(3);
}

where

private Task CallServiceAsync(int serviceNumber)
{
    return Task.Run(() = > { SomeMethod(); });
}

The above is equivelent to the first code shown but the framework takes care of everything for you under-the-hood.

I hope this helps.

Community
  • 1
  • 1
MoonKnight
  • 23,214
  • 40
  • 145
  • 277
1

I can think of only 2 principal approaches here:

  1. Have some place where you would aggregate as soon as it is received. This would be done with something like ContinueWith() method. However, you would need to handle synchronization in your aggregation code, and in the end you would still need to wait until all tasks have finished. Therefore, such approach only makes sense if the aggregation takes really long time and can be done in parallel.

  2. The way you do it - nice and simple :)

As for using async, I would vote for using TaskFactory.ContinueWhenAll() method. This way you will not block any threads, code will look nicer than having multiple awaits(depends upon taste) and will likely have less overhead (may depend upon implementation).

ironic
  • 8,368
  • 7
  • 35
  • 44
0

The proper way to use async is to first define a CallServiceAsync that is naturally asynchronous (i.e., uses HttpClient or is a TaskFactory.FromAsync wrapper around Begin/End methods). It should not use Task.Run.

Once you have a naturally-asynchronous CallServiceAsync, then you can issue 20 simultaneous calls and (asynchronously) wait for them as such:

public async Task<XDocument> GetAllDataAsync()
{
  var task1 = CallServiceAsync(1);
  var task2 = CallServiceAsync(2);
  var task3 = CallServiceAsync(3);
  ...
  XDocument[] results = await task.WhenAll(task1, task2, task3, ...);
  return JoinXmlDocuments(results);
}

This approach will not block any threads at all.

You can make it slightly more performant by using ConfigureAwait(false) as such:

XDocument[] results = await task.WhenAll(task1, task2, task3, ...).ConfigureAwait(false);
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810