2

I am trying to parse data from several websites continuously. I would like this action to be preformed individually in a loop in an asynchronous manner until the program is closed. I am not sure what the structure should be for this kind of logic.

Right now I am following this pattern.

async public void ParseAll(List<Site> SiteList)
{
    List<Task> TaskList = new List<Task>();

    foreach(Site s in SiteList)
    {
        TaskList.Add(s.ParseData);
    }

    await Task.WhenAll(TaskList)
}

The issue is that if I construct a Loop around this method then the sites that are updated first will have to wait until the whole list is finished before the method can run again. Theoretically, what I would like to do is just put each site back on the bottom of the TaskList when it finished its ParseData method but I am not sure if thats possible, or if thats the best way.

noseratio
  • 59,932
  • 34
  • 208
  • 486
cubesnyc
  • 1,385
  • 2
  • 15
  • 32
  • 2
    **Do not use `async void` methods unless they are event handlers. Use `async Task` for async methods that do not return results.** – Daniel Mann Jun 03 '14 at 17:10
  • How many sites are there and how often do you want to update? – Linky Jun 03 '14 at 17:11
  • @DanielMann Considering the method is designed to never ever return, getting a `Task` that will never complete isn't really any more useful than a `void` method. – Servy Jun 03 '14 at 17:28
  • 2
    AFAIK that's not true as exception behavior is different as async void will go up the call stack if an exception occured in a continuation. – Linky Jun 03 '14 at 17:36
  • 1
    @Servy Linky is correct. Exceptions are handled differently when they occur in `async void` methods. Hit up your search engine of choice and take a look -- unhandled exceptions in `async void` methods are rethrown on the thread pool and cause all sorts of nasty behavior. – Daniel Mann Jun 03 '14 at 17:46

4 Answers4

3

Theoretically, what I would like to do is just put each site back on the bottom of the TaskList when it finished its ParseData

Looks like you need to maintain a queue of sites to be processed. Below is my take on this, using SemaphoreSlim. This way you can also limit the number of concurrent tasks to be less than the actual number of sites, or add new sites on-the-fly. A CancellationToken is used to stop the processing from outside. The use of async void is justified here IMO, QueueSiteAsync keeps track of the tasks it starts.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncLoop
{
    class Program
    {
        public class Site
        {
            public string Url { get; set; }
            public async Task ParseDataAsync(CancellationToken token)
            {
                // simulate download and parse
                int delay = new Random(Environment.TickCount).Next(100, 1000);
                await Task.Delay(delay, token);
                Console.WriteLine("Processed: #{0}, delay: {1}", this.Url, delay);
            }
        }

        object _lock = new Object();
        HashSet<Task> _pending = new HashSet<Task>(); // sites in progress
        SemaphoreSlim _semaphore;

        async void QueueSiteAsync(Site site, CancellationToken token)
        {
            Func<Task> processSiteAsync = async () =>
            {
                await _semaphore.WaitAsync(token).ConfigureAwait(false);
                try 
                {           
                    await site.ParseDataAsync(token);
                    QueueSiteAsync(site, token);
                }
                finally
                {
                    _semaphore.Release();
                }
            };

            var task = processSiteAsync();
            lock (_lock)
                _pending.Add(task);
            try
            {
                await task;
                lock (_lock)
                    _pending.Remove(task);
            }
            catch
            {
                if (!task.IsCanceled && !task.IsFaulted)
                    throw; // non-task error, re-throw

                // leave the faulted task in the pending list and exit
                // ProcessAllSites will pick it up
            }
        }

        public async Task ProcessAllSites(
            Site[] sites, int maxParallel, CancellationToken token)
        {
            _semaphore = new SemaphoreSlim(Math.Min(sites.Length, maxParallel));

            // start all sites
            foreach (var site in sites)
                QueueSiteAsync(site, token);

            // wait for cancellation
            try
            {
                await Task.Delay(Timeout.Infinite, token);
            }
            catch (OperationCanceledException)
            {
            }

            // wait for pending tasks
            Task[] tasks;
            lock (_lock)
                tasks = _pending.ToArray();
            await Task.WhenAll(tasks);
        }

        // testing
        static void Main(string[] args)
        {
            // cancel processing in 10s
            var cts = new CancellationTokenSource(millisecondsDelay: 10000); 
            var sites = Enumerable.Range(0, count: 10).Select(i => 
                new Site { Url = i.ToString() });
            try
            {
                new Program().ProcessAllSites(
                    sites.ToArray(), 
                    maxParallel: 5, 
                    token: cts.Token).Wait();
            }
            catch (AggregateException ex)
            {
                foreach (var innerEx in ex.InnerExceptions)
                    Console.WriteLine(innerEx.Message);
            }
        }
    }
}

You may also want to separate download and parsing into separate pipelines, check this for more details.

Community
  • 1
  • 1
noseratio
  • 59,932
  • 34
  • 208
  • 486
1

It's easy enough to create a method to loop continuously and parse a single site over and over again. Once you have that method, you can call it once on each site in the list:

private async void ParseSite(Site s)
{
    while (true)
    {
        await s.ParseData();
    }
}

public void ParseAll(List<Site> siteList)
{
    foreach (var site in siteList)
    {
        ParseSite(site);
    }
}
Servy
  • 202,030
  • 26
  • 332
  • 449
  • Is there a way to extend this method to have a concurrency? – cubesnyc Jun 03 '14 at 17:51
  • @YevgenyYurgenson It's *already* going to do the work concurrently. No extension is necessary. – Servy Jun 03 '14 at 17:53
  • @Servy: Doesn't this depend on the SynchronizationContext? AFAIK, when running from a UI Thread the synchronous code (which can take some time while parsing) will run sequentially (although "interlocked") – Linky Jun 03 '14 at 21:05
  • @Linky Any CPU bound work would need to run sequentially, sure. But the whole idea here is that the work is asynchronous, so while two site's parser can't be checking their loop condition at the same time, one request can be doing its small amount of CPU bound processing while others are waiting asynchronously. – Servy Jun 03 '14 at 21:09
  • Thanks. It's just nitpicking: while the download itself should be async, the method is called ParseSite which I'd expect to be CPU bound as well and as he explicitly asked for concurrency... – Linky Jun 03 '14 at 21:15
  • @Linky If it's CPU bound synchronous work then it shouldn't be returning a `Task` in the first place. If it's CPU bound work that's already offloaded to another task then nothing needs to be done. This solution *does* add concurrency. It adds it by the omition of an `await` on the individual operations. The result of starting N asynchronous operations without waiting on any of them is that they run concurrently. Nothing else needs to be done. – Servy Jun 03 '14 at 21:22
  • (not sure if this comment area is still the right place...please tell me if it gets off-topic or you're tired of me nagging). Again the nitpicking: Especially when reading async you're pretty much always doing some calculation on the data you read (why else would you have read it?). And is it (theoretically) correct to say it runs concurrent (several at the same time) when most parts of the code have to run sequentially? I'm asking because at work I'm trying to get devs off the idea that async-await==concurrency cause their .net code (at least in UI) never runs concurrently. Am I wrong? – Linky Jun 03 '14 at 21:48
  • 1
    @Linky Yes, because you're making the inherent assumption that the majority of the work is CPU bound work, despite the fact that there is no such indication that it is the case. [There is no need to use multiple threads to create parallelism](http://stackoverflow.com/a/23833635/1159478). Doing actions asynchronously, and not waiting on them before starting other actions, creates parallelism.. – Servy Jun 04 '14 at 14:17
  • This is a very elegant solution that illustrates exactly what I am trying to do, but it raises several logic questions. If I adopt this pattern, and after parsing one of the sites, I decide to perform some action based on that result, how would I then tell all the other sites to pause? – cubesnyc Jun 05 '14 at 18:30
  • @YevgenyYurgenson Well, that's entirely beyond the scope of the question, but it's still easy enough to do. Just pass a `SemaphoreSlim` to the method, wait/release it on each iteration, and then this allows anyone with access to the semaphore to pause the iteration of the queue for as long as they are holding the semaphore. – Servy Jun 05 '14 at 18:34
0

If you want to visit the site again as soon as it is complete, you probably want to use Task.WhenAny and integrate your outer loop with your inner loop, something like this (assuming the ParseData function will return the Site it is parsing for):

async public void ParseAll(List<Site> SiteList)
{
    while (true)
    {
        List<Task<Site>> TaskList = new List<Task<Site>>();

        foreach(Site s in SiteList)
        {
            TaskList.Add(s.ParseData());
        }

        await Task.WhenAny(TaskList);
        TaskList = TaskList.Select(t => t.IsCompleted ? t.Result.ParseData() : t).ToList();
    }
}
antlersoft
  • 14,636
  • 4
  • 35
  • 55
-1

Did you tried the PLinq lib?

Plinq allows you to execute linq querys async.

In your case it would look like:

SiteList.AsParallel().ForEach(s => s.ParseData);

Venson
  • 1,772
  • 17
  • 37
  • He doesn't want to call this on each site once, he wants to have each site continue to loop, parsing the data each time it finishes. – Servy Jun 03 '14 at 17:08