3

Attempting to write a HTML crawler using the Async CTP I have gotten stuck as to how to write a recursion free method for accomplishing this.

This is the code I have so far.

private readonly ConcurrentStack<LinkItem> _LinkStack;
private readonly Int32 _MaxStackSize;
private readonly WebClient client = new WebClient();

Func<string, string, Task<List<LinkItem>>> DownloadFromLink = async (BaseURL, uri) => 
{
    string html = await client.DownloadStringTaskAsync(uri);
    return LinkFinder.Find(html, BaseURL);
};

Action<LinkItem> DownloadAndPush = async (o) => 
{
    List<LinkItem> result = await DownloadFromLink(o.BaseURL, o.Href);
    if (this._LinkStack.Count() + result.Count <= this._MaxStackSize)
    {
        this._LinkStack.PushRange(result.ToArray());
        o.Processed = true;
    }  
};

Parallel.ForEach(this._LinkStack, (o) => 
{
    DownloadAndPush(o);
});

But obviously this doesn't work as I would hope because at the time that Parallel.ForEach executes the first (and only iteration) I only have only 1 item. The simplest approach I can think of to make the ForEach recursive but I can't (I don't think) do this as I would quickly run out of stack space.

Could anyone please guide me as to how I can restructure this code, to create what I would describe as a recursive continuation that adds items until either the MaxStackSize is reached or the system runs out of memory?

Malachi
  • 3,205
  • 4
  • 29
  • 46
Maxim Gershkovich
  • 45,951
  • 44
  • 147
  • 243

1 Answers1

10

I think the best way to do something like this using C# 5/.Net 4.5 is to use TPL Dataflow. There even is a walkthrough on how to implement web crawler using it.

Basically, you create one "block" that takes care of downloading one URL and getting the link from it:

var cts = new CancellationTokenSource();

Func<LinkItem, Task<IEnumerable<LinkItem>>> downloadFromLink =
    async link =>
            {
                // WebClient is not guaranteed to be thread-safe,
                // so we shouldn't use one shared instance
                var client = new WebClient();
                string html = await client.DownloadStringTaskAsync(link.Href);

                return LinkFinder.Find(html, link.BaseURL);
            };

var linkFinderBlock = new TransformManyBlock<LinkItem, LinkItem>(
    downloadFromLink,
    new ExecutionDataflowBlockOptions
    { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });

You can set MaxDegreeOfParallelism to any value you want. It says at most how many URLs can be downloaded concurrently. If you don't want to limit it at all, you can set it to DataflowBlockOptions.Unbounded.

Then you create one block that processes all the downloaded links somehow, like storing them all in a list. It can also decide when to cancel downloading:

var links = new List<LinkItem>();

var storeBlock = new ActionBlock<LinkItem>(
    linkItem =>
    {
        links.Add(linkItem);
        if (links.Count == maxSize)
            cts.Cancel();
    });

Since we didn't set MaxDegreeOfParallelism, it defaults to 1. That means using collection that is not thread-safe should be okay here.

We create one more block: it will take a link from linkFinderBlock, and pass it both to storeBlock and back to linkFinderBlock.

var broadcastBlock = new BroadcastBlock<LinkItem>(li => li);

The lambda in its constructor is a "cloning function". You can use it to create a clone of the item if you want to, but it shouldn't be necessary here, since we don't modify the LinkItem after creation.

Now we can connect the blocks together:

linkFinderBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(storeBlock);
broadcastBlock.LinkTo(linkFinderBlock);

Then we can start processing by giving the first item to linkFinderBlock (or broadcastBlock, if you want to also send it to storeBlock):

linkFinderBlock.Post(firstItem);

And finally wait until the processing is complete:

try
{
    linkFinderBlock.Completion.Wait();
}
catch (AggregateException ex)
{
    if (!(ex.InnerException is TaskCanceledException))
        throw;
}
svick
  • 236,525
  • 50
  • 385
  • 514
  • Wow! Thank you for the brilliant explanation. Could you just confirm one thing? If we set MaxDegreeOfParallelism to a number > 1 does this mean I need change the collection type to something like the ConcurrentStack for it to be thread safe? – Maxim Gershkovich Feb 15 '12 at 08:59
  • Do you mean the collection in `storeBlock`? And where do you set `MaxDegreeOfParallelism`? If you set `MDOP` of `storeBlock` to > 1, then, yes, you would need to use some thread-safe collection there (or use locks). But if you set `MDOP` of some other block to > 1, it doesn't affect the parallelism of `storeBlock`, so you don't need to consider thread-safety. – svick Feb 15 '12 at 11:54
  • Boy this is going to make me upgrade to 2012! +1 – toddmo Dec 14 '13 at 03:29