1

I want to create a web crawler that will download the page located at some URL, search for some elements and then create a result for it, which will be ready to save to DB. But I want this DB part to be saved in batches.

The last part is, what makes this whole exercise a little bit harder (at lease for my current understanding of TPL Dataflow, which has 1 day history;) ) I know, that there is BatchBlock element but the scenario i saw it in, was simple, where it was first step and was "batching" the input given in the application (not the internal pipeline work) And i've tried to put somewhere inside the pipeline this batching part, but i am either forced to pass a list of urls to the first step (and then the download url phase will be one step, and other steps will be waiting till this one is finished) or i can pass one url to the pipeline, but then there is noting to batch as from 1 url there is one parsing element to save to DB :)

This is what i want to achieve:

enter image description here

What is important of course, that each download url is "independant" from other "download url" action. So once some page is downloaded it can instantly go to the webscraping part. At once this is ready, it can instantly go to the phase of saving in DB (so waiting till batch of x elements comes - for example - 5) and then save it to DB.

Of course, I don't have to mention, that both "Download url" and "Webscrape neccessary data" transformation are async operations.

Maybe this is not something you can solve with TPL Dataflow? Please advice :)

[UPDATE - 07.08.2020 13:25]

Ok, yesterday I made a false assumption, that I post only one thing in the pipeline as the signature takes one string. That was clearly wrong assumption as I can just call it several times :)

I have more or less working examples, but two things are missing. Changing it to async and how to flush BatchBlock. Because if I have BatchBlock of size 3 and I send it to pipeline 8 URLs, I get a response only from the first 6.

Another issue with this example is .... that even without the need to flush (so i am sending 9 URLs and BatchBlock is 3) still the program runs indefinitely. Where is the issue?

Console.WriteLine($"Processing started: {DateTime.Now.ToString()}");
var workBuffer = new BatchBlock<string>(3);
var downloadUrl = new TransformBlock<string, string>(url =>
{
    Thread.Sleep(int.Parse(url.Last().ToString()) * 1000);
    return url;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var parseContent = new TransformBlock<string, string>(content =>
{
    Thread.Sleep(int.Parse(content.Last().ToString()) * 1000 / 2);
    return $"parsing result for: {content}";
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var saveToDb = new TransformBlock<string[], bool>(results =>
{
    Console.WriteLine($"results: {DateTime.Now.ToString()} {String.Join(", ", results)}");
    return true;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

downloadUrl.LinkTo(parseContent, new DataflowLinkOptions
{
    PropagateCompletion = true
});
parseContent.LinkTo(workBuffer, new DataflowLinkOptions
{
    PropagateCompletion = true
});
workBuffer.LinkTo(saveToDb, new DataflowLinkOptions
{
    PropagateCompletion = true
});

downloadUrl.Completion.ContinueWith(obj => parseContent.Complete());
parseContent.Completion.ContinueWith(obj => workBuffer.Complete());
workBuffer.Completion.ContinueWith(obj => saveToDb.Complete());

//last digit in string is treated as url download time (in seconds) and half of it is for processing time.  
downloadUrl.Post("http://some_site_to_parse.com2"); //downoading for this url is 2 sec, processing 1 sec. It will be ready to save to DB after 3 sec
downloadUrl.Post("http://some_site_to_parse.com3"); //downoading for this url is 3 sec, processing 1,5 sec. It will be ready to save to DB after 4,5 sec
downloadUrl.Post("http://some_site_to_parse.com4"); //downoading for this url is 4 sec, processing 2 sec. It will be ready to save to DB after 6 sec
//here should first batch be saved to DB after 6 seconds
downloadUrl.Post("http://some_site_to_parse.com5"); //downoading for this url is 5 sec, processing 2,5 sec. It will be ready to save to DB after 7,5 sec
downloadUrl.Post("http://some_site_to_parse.com6"); //downoading for this url is 6 sec, processing 3 sec. It will be ready to save to DB after 9 sec
downloadUrl.Post("http://some_site_to_parse.com7"); //downoading for this url is 7 sec, processing 3,5 sec. It will be ready to save to DB after 10,5 sec
//here should second batch be saved to DB after 10,5 seconds
downloadUrl.Post("http://some_site_to_parse.com8"); //downoading for this url is 8 sec, processing 4 sec. It will be ready to save to DB after 12 sec
downloadUrl.Post("http://some_site_to_parse.com9"); //downoading for this url is 9 sec, processing 4,5 sec. It will be ready to save to DB after 13,5 sec
downloadUrl.Post("http://some_site_to_parse.com10"); //downoading for this url is 10 sec, processing 5 sec. It will be ready to save to DB after 15 sec
//here should third batch be saved to DB after 15 seconds

downloadUrl.Complete();
saveToDb.Completion.Wait();

To summarize three questions:

  1. How to flush BatchBlock
  2. Why is this example app running indefinitely
  3. How to make it Async

[UPDATE 2 - 07.08.2020 14:28]

Somebody suggested that this is the solution to my problem: TPL Dataflow Transform block post to batch block followed by actionblock

But i've added all the , new DataflowLinkOptions { PropagateCompletion = true } and have added the workBuffer.Completion.ContinueWith(obj => saveToDb.Complete()); and it is still not working

Piotr
  • 1,155
  • 12
  • 29
  • Do you have any code to show your attempt? – TimTIM Wong Aug 07 '20 at 00:15
  • There was a lot of information here, but at the same time i was left feeling empty. I assume you have blocks for DownloadUrl, and Scrape which can run in parallel and can be unordered. Just feed them into a BatchBlock then an ActionBlock. Which part exactly are you having trouble with? – TheGeneral Aug 07 '20 at 00:15
  • Does this answer your question? [TPL Dataflow Transform block post to batch block followed by actionblock](https://stackoverflow.com/questions/31804794/tpl-dataflow-transform-block-post-to-batch-block-followed-by-actionblock) – TimTIM Wong Aug 07 '20 at 00:18
  • OK -- I'm done editing my answer. It seems to work really fast.. Hopefully it helps you out. These things are a lot of fun, I wish I could use them more in my daily work. – Andy Aug 07 '20 at 02:05
  • @TimTimWong and the rest of the guys - i did clarify the question and added some example code with question for it. – Piotr Aug 07 '20 at 11:54
  • 1
    Try configuring **all** links with `PropagateCompletion = true`, not only the last one. – Theodor Zoulias Aug 07 '20 at 11:58
  • Also by configuring the blocks with `MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded` you are requesting unlimited concurrency. You are going to bombard all remote servers with requests at once, and get bombarded with the responses. I hope you know what you are doing. – Theodor Zoulias Aug 07 '20 at 12:02
  • @TheodorZoulias - did try. Result the same. You can copy paste application and test it in LinqPad or anywhere else;) – Piotr Aug 07 '20 at 12:04
  • @TheodorZoulias This is only an example, but in the final solution i will have 20 sites. Don't worry, i will not kill the internet;) – Piotr Aug 07 '20 at 12:05
  • 1
    The `BatchBlock` eventually flushes itself when it is marked as completed. Which should happen automatically if the conditions below are met: (1) you configure all links with `PropagateCompletion = true`, (2) you complete manually the first block of the pipeline (by calling `Complete`), and (3) the processing is done. – Theodor Zoulias Aug 07 '20 at 12:13
  • Yes, so in my case all the "Completion.ContinueWith()" are not neccessary. `PropagateCompletion = true` should be enough. But for some reason it is not. You can download the code and see it for yourself (or my Linqpad is crazy - that is an option as well) – Piotr Aug 07 '20 at 12:31
  • 1
    The reason that the `saveToDb` is never completed is because its output buffer is never emptied. You probably don't need a `TransformBlock` as that last block of the pipeline. An `ActionBlock` should be more suitable. If you do want to keep the `TransformBlock` for some reason, make sure that it is emptied somehow. For example by linking it to a null targer (`saveToDb.LinkTo(DataflowBlock.NullTarget())`). – Theodor Zoulias Aug 07 '20 at 14:39

2 Answers2

2

I think this does what you are trying to do...

First, create a client that is used by everyone:

private static readonly HttpClient _client = new HttpClient(new HttpClientHandler
{
    AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate
});

Then here is how I constructed the blocks and linked them up:

const int maxDegreeOfParalleism = 10;

// first in, first out buffer block
var uriInputBlock = new BufferBlock<Uri>();

// transform block will download the data to string
var downloadHttpDataBlock = new TransformBlock<Uri, string>(async uri =>
{
    using(var msg = new HttpRequestMessage(HttpMethod.Get, uri))
    using(var resp = await _client.SendAsync(msg, HttpCompletionOption.ResponseHeadersRead))
    {
        return await resp.Content.ReadAsStringAsync().ConfigureAwait(false);
    }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });

// this block will take the data and scrape what it wants
var htmlScrapeBlock = new TransformBlock<string, string[]>(data =>
{
    var doc = new HtmlAgilityPack.HtmlDocument();
    doc.LoadHtml(data);
    return doc.DocumentNode.SelectNodes("//a[@href]").
        Select(x => x.GetAttributeValue("href", string.Empty)).ToArray();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });

// take in arrays and send them out as single elements
var manyToOneBlock = new TransformManyBlock<string[], string>(x => x);

// output data to a batch block with grouping of 10
var outputDataBlcok = new BatchBlock<string>(10);

// final block to store it somewhere
var databaseBlock = new ActionBlock<string[]>(x =>
{
    Console.WriteLine($"Group of {x.Length} items to be processed:");
    foreach (var uri in x)
    {
        Console.WriteLine($"Store this: {uri}");
    }
});

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
uriInputBlock.LinkTo(downloadHttpDataBlock, linkOptions);
downloadHttpDataBlock.LinkTo(htmlScrapeBlock, linkOptions);
htmlScrapeBlock.LinkTo(manyToOneBlock, linkOptions);
manyToOneBlock.LinkTo(outputDataBlcok, linkOptions);
outputDataBlcok.LinkTo(databaseBlock, linkOptions);

uriInputBlock.Post(new Uri("https://stackoverflow.com"));
uriInputBlock.Post(new Uri("https://google.com"));
uriInputBlock.Post(new Uri("https://yahoo.com"));
uriInputBlock.Post(new Uri("https://example.com"));

// When you want to complete/close down the pipeline, call this
uriInputBlock.Complete();
// you can wait for all data to finish propogating by calling this:
databaseBlock.Completion.Wait();

This is just a basic concept, obviously you can make this much better, but it should get you started. More info on the many different blocks here.

Andy
  • 12,859
  • 5
  • 41
  • 56
  • Just FYI, this doesnt batch – TheGeneral Aug 07 '20 at 01:18
  • @TheGeneral -- if you add `MaxDegreeOfParallelism` to the `downloadHttpDataBlock`, would it? Or would i have to do that to each block? – Andy Aug 07 '20 at 01:19
  • I think the OP wants to batch the Db Save. Which would mean consolidating the results into a batchblock that when it reaches a threshold pushes down to the action block. Anyway, i think you captured the spirit of the question and the OP can extrapolate – TheGeneral Aug 07 '20 at 01:21
  • @TheGeneral -- oh that's a good idea. Then when you call "complete" would that flush that BatchBlock if it didn't meet the criteria? I can change the "outputDataBlock" to a "BatchBlock" with a threshold of like 10. – Andy Aug 07 '20 at 01:22
  • Yeah "I think it does", however there are other approaches to that problem https://stackoverflow.com/questions/52633346/tpl-dataflow-batching-on-duration-or-threshold/53171789#53171789 – TheGeneral Aug 07 '20 at 01:23
  • @TheGeneral -- Sorry to be a pest, but does that look good? I have it going from the "scrape" block (which outputs an array) to a "transformMany" which would then send each item in the array to a "batch" block (with a size of 10). – Andy Aug 07 '20 at 01:53
  • 1
    yeah, once again I think it fits the spirit of the question which I think was more about linking up the chain – TheGeneral Aug 07 '20 at 02:01
  • 1
    The initial `uriInputBlock` is redundant IMHO. You can just feed directly the `downloadHttpDataBlock`. Adding a buffer just bloats the source code and adds overhead. Another point of debate is configuring the `downloadHttpDataBlock` and the `htmlScrapeBlock` with the same degree of parallelism. The workloads of these blocks are fundamentally different (I/O-bound vs CPU-bound), so most probably their optimal DOP will be different as well. Another one: you could make the `htmlScrapeBlock` a `TransformManyBlock`, and get rid of the `manyToOneBlock`. In total only 3 blocks are needed, not 5. – Theodor Zoulias Aug 07 '20 at 06:51
  • @Andy - have a look on the update, there is an example code with issues i am facing. It was never about how to make web crawler and or download urls and so. It is more how (once you have it - and i do have:) ) make it fast for multiple urls and how to batch results to save to DB in batches. – Piotr Aug 07 '20 at 11:56
2

I'd suggest that you look at Microsoft's Reactive Framework (aka Rx) as it makes this kind of processing super simple.

If I can assume that you have a List<string> urls and you have the following methods:

Task<string> DownloadUrlAsync(string url)
Task<string> WebscrapeAsync(string content)
Task SaveDataToDBAsync(IList<string> data)

...then you can do this with Rx:

int buffer_size = 50;
IObservable<Unit> query =
    urls
        .ToObservable()
        .SelectMany(url => Observable.FromAsync(() => DownloadUrlAsync(url)))
        .SelectMany(content => Observable.FromAsync(() => WebscrapeAsync(content)))
        .Buffer(buffer_size)
        .SelectMany(buffer => Observable.FromAsync(() => SaveDataToDBAsync(buffer)));
        
IDisposable subscription = query.Subscribe();

That query handles all of the async calls using multiple threads, buffering the content and saving to the database.

The .Subscribe method also has callbacks to handle values as they are produced, any exception, and/or a completion.

You need to NuGet System.Reactive and add using System.Reactive.Linq; to get the bits.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • 1
    This solution has the virtue of succinctness, and also gets some points for style, but lacks all the powerful built-in options of the TPL Dataflow library: `CancellationToken`, `MaxDegreeOfParallelism`, `BoundedCapacity`, `EnsureOrdered` etc. It is also less enticing for people how are not already familiar with RX. If you have no RX experience and just want to get the job done with the least effort possible, RX is not for you. You'll become quickly frustrated by the intricacies of the library. Simple things, like finding how to await for the thing to complete, are surprisingly tricky. – Theodor Zoulias Aug 07 '20 at 06:35
  • @TheodorZoulias - It does have `CancellationToken` and `MaxDegreeOfParallelism`. I don't know what `BoundedCapacity` is. I can assume what `EnsureOrdered` is, and yes, that's tricky. You can simply `await` an observable. Nevertheless, Rx is an incredibly powerful library - far more powerful than TPL or the DataFlow library - so yes there is a lot to learn. – Enigmativity Aug 07 '20 at 07:14
  • The `BoundedCapacity` option allows to slow down a producer if the consumer can't keep up with the pace. For example if downloading pages happens to be faster than scrapping them, you could end up with thousands of uncompressed HTML documents clogging the RAM of the machine. Setting the `BoundedCapacity` of the scrapper to, say, 10 documents, will make the downloader to become temporarily blocked every time the input buffer of the scrapper is full. Does the RX offer such a feature? – Theodor Zoulias Aug 07 '20 at 08:54
  • @TheodorZoulias - There's nothing built-in that does that. Rx tries to stay pure and, as such, avoids feedback. However, all of the building blocks to do it are there. And that's the point I suspect that you're making. It's extremely capable, but rather obtuse in how to do some of these things. – Enigmativity Aug 07 '20 at 09:00
  • Yeap. And this is because, I assume, the RX was not created with the intention of building processing pipelines (like the TPL Dataflow), but with the intention of manipulating streams of incoming data. So trying to build a feature-rich processing pipeline with RX could become awkward, and could have performance implications too. – Theodor Zoulias Aug 07 '20 at 09:06
  • @TheodorZoulias - Naive implementations can have performance implications, but all of the fundamentals are there to write performant code. – Enigmativity Aug 07 '20 at 10:01
  • @Enigmativity - I just tied this approach and i did rewrite my example to RX - it looks nice. One question, how do you awaits results? – Piotr Aug 07 '20 at 12:16
  • @ENigmativity - sorry, silly question. Just await query :D – Piotr Aug 07 '20 at 12:18
  • 1
    I would like to note two caveats regarding awaiting observables. (1) If the observable is empty, an exception is thrown. Use the operator `DefaultIfEmpty` to guard against this possibility. (2) Awaiting an observable creates a subscription. If you also `Subscribe` to it manually, you'll end up with two subscriptions, and each subscription will cause an independent warming of the observable. In other words the urls will be processed twice, creating most probably a mess. So either `await` or `Subscribe`, not both! – Theodor Zoulias Aug 07 '20 at 12:23
  • 1
    @TheodorZoulias - fully agree. I did add `await query;` in this app and the result was processed twice :) Lucly it was on my demo app;) – Piotr Aug 07 '20 at 12:32
  • @Enigmativity - you wrote you can control MaxDegreeOfParallelism. How can you do that? – Piotr Aug 07 '20 at 19:34
  • Reviewing [one](https://stackoverflow.com/questions/63293400/tpl-dataflow-for-webcrawler/63294205?noredirect=1#comment111930007_63294205) of my previous comments, I gave an incorrect example of using the TPL Dataflow `BoundedCapacity` option. To slow down the producer, both the producer and the consumer must be bounded. If either of them is unbounded, then an unlimited amount of messages may accumulate in the output buffer of the producer **or** the input buffer of the consumer, allocating a potentially huge amount of memory in either case. – Theodor Zoulias Aug 07 '20 at 22:19
  • @Piotr - Observables are like enumerables - they are a definition of something that can be iterated later - delayed execution. So if you both `Subscribed` and did an `await` you will execute the observable twice. You just need to replace `IDisposable subscription = query.Subscribe();` with `await query;` - don't do both. – Enigmativity Aug 07 '20 at 23:19
  • @TheodorZoulias - The safe way to await an observable is generally `await query.ToArray();`. The `.ToArray()` operator changes the observable from a `IObservable`, that can produce zero or more values, to an `IObservable` that produces exactly one value (which will contain zero or more values). – Enigmativity Aug 07 '20 at 23:22
  • @TheodorZoulias - And generally you're interested in all the values returned by the observable so the `.ToArray()` trick should be used. If you don't use it then `await` only gives you the last value produced. – Enigmativity Aug 07 '20 at 23:23
  • @Piotr - If you change `.SelectMany(url => Observable.FromAsync(() => DownloadUrlAsync(url)))` to `.Select(url => Observable.FromAsync(() => DownloadUrlAsync(url))).Merge(maxDegreeOfParallelism)` you can control the parallelism. Note the change from `SelectMany` to `Select`. – Enigmativity Aug 07 '20 at 23:24
  • Yes, `await obs.ToArray()` can be useful in some cases. But in this particular case using `ToArray` will just result to allocating a buffer that will be discarded eventually. The OP does not want any final results, they just want to process their URLs. And the processing happens as a side-effect of transforming the observable. This is why I suggested the more efficient `DefaultIfEmpty`. Either way it's a trick though. Ideally we shouldn't have to do tricks in order to do the simplest things, like awaiting our observables to complete. – Theodor Zoulias Aug 07 '20 at 23:44
  • @TheodorZoulias - In this case the observable is returning `Unit` (which is a singleton struct with no fields or properties) and the OP is buffering. They would have to download gazillions of web pages before the return array of `Unit` is a memory issue. – Enigmativity Aug 08 '20 at 05:19