1

I have a working pipeline for web scraping. It is downloading the page basing on the URL, parsing the content, then splitting the results into blocks (each x elements is being saved to DB) And it is working fine.

But I would also need one additional step, which will summarize everything that has been done in the pipeline. My current implementation is passing through the pipeline the same object (only adding some values on next steps) so I made some repro code, that should show what I would want to achieve.

Console.WriteLine($"Processing started: {DateTime.Now.ToString()}");
var workBuffer = new BatchBlock<string>(3);
var resultsToFinalize = new List<string>();
var downloadUrl = new TransformBlock<string, string>(url =>
{
    Thread.Sleep(int.Parse(url.Last().ToString()) * 1000);
    return url;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var parseContent = new TransformBlock<string, string>(content =>
{
    Thread.Sleep(int.Parse(content.Last().ToString()) * 1000 / 2);
    return $"parsing result for: {content}";
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var saveToDb = new ActionBlock<string[]>(results =>
{
    Console.WriteLine($"results: {DateTime.Now.ToString()} {String.Join(", ", results)}");
    results.ToList().ForEach(t => resultsToFinalize.Add(t));
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

downloadUrl.LinkTo(parseContent, new DataflowLinkOptions
{
    PropagateCompletion = true
});
parseContent.LinkTo(workBuffer, new DataflowLinkOptions
{
    PropagateCompletion = true
});
workBuffer.LinkTo(saveToDb, new DataflowLinkOptions
{
    PropagateCompletion = true
});

Enumerable.Range(2, 9).Select(t => downloadUrl.Post($"http://some_site_to_parse.com{t}"));

downloadUrl.Complete();
saveToDb.Completion.Wait();
Console.WriteLine(String.Join(Environment.NewLine, resultsToFinalize));

Currently, it is working, because there is external resultsToFinalize variable to gather all the results.

Probably I should change saveToDb to TransformBlock. But how to cumulate the results till the whole pipeline is ready (the best would be for this additional block to fire after everything (previous block) are finished)

I could, theoretically, create one more BatchBlock before this additional step and set its size to Input size, but this seems a little hacky :)

How could you tackle this issue otherwise?

[UPDATE 12.08.2020 16:47] It seems that it is incomprehensible by some folks - what i want to achieve. So i will make it clear: I've posted some code. I want to have the same output, my code is producing so:

Processing started: 12.08.2020 16:27:46

results: 12.08.2020 16:27:52 parsing result for: http://some_site_to_parse.com2, parsing result for: http://some_site_to_parse.com3, parsing result for: http://some_site_to_parse.com4

results: 12.08.2020 16:27:57 parsing result for: http://some_site_to_parse.com5, parsing result for: http://some_site_to_parse.com6, parsing result for: http://some_site_to_parse.com7

results: 12.08.2020 16:28:00 parsing result for: http://some_site_to_parse.com8, parsing result for: http://some_site_to_parse.com9, parsing result for: http://some_site_to_parse.com10

parsing result for: http://some_site_to_parse.com2

parsing result for: http://some_site_to_parse.com3

parsing result for: http://some_site_to_parse.com4

parsing result for: http://some_site_to_parse.com5

parsing result for: http://some_site_to_parse.com6

parsing result for: http://some_site_to_parse.com7

parsing result for: http://some_site_to_parse.com8

parsing result for: http://some_site_to_parse.com9

parsing result for: http://some_site_to_parse.com10

but without usage of resultsToFinalize (using power of TPL :) )

I would assume that probably saveToDb should be changed from ActionBlock to TransformBlock. And probably there should be some new ActionBlock at the end. The question is how to set it - so that it will fire only ONCE.

Piotr
  • 1,155
  • 12
  • 29
  • It's not hacky. If you want to write out data in batches, you need something to batch the data. Once you have a BatchBlock, you can use `SqlBulkCopy` or whatever your database offers to import data in bulk, with minimal logging, instead of inserting rows one-by-one. – Panagiotis Kanavos Aug 12 '20 at 14:40
  • No, the db part is working. It is batchin each for example 5 pages. That is not the problem. I just need one more step, that will gather ALL pages (so the saveToDb should be probably changed from ActionBlock to TransfromBlock - and add one more block (potentially could be ActionBlock) But doing it as i've just described would mean - that this last ActionBlock will be fired as many times as the @Panagiotis Kanavos - SaveToDb transformBlock (and i want it to fire ONLY ONCE, after gathering ALL the results) Surely it can be done without extneral list and by using TPL Data blocks – Piotr Aug 12 '20 at 14:44
  • Why do you need that at all? Post sample code that actually demonstrates the problem – Panagiotis Kanavos Aug 12 '20 at 14:44
  • @Panagiotis Kanavos - does it matter why?:) It should be totally irrelevant :) – Piotr Aug 12 '20 at 14:45
  • @Panagiotis Kanavos - i've posted the code. The results of proposal should be the same as it is right now but without `resultsToFinalize` variable – Piotr Aug 12 '20 at 14:46
  • It's the only thing that matters - it sounds like you want to use DataFlow as if it was a *workflow* - it's not. If you ever used SSIS you'll notice that Dataflows are separate from workflows. Workflows contain dataflows as steps. When you want something to be done after something else, you have workflow steps, not a data flow pipeline – Panagiotis Kanavos Aug 12 '20 at 14:47
  • While you can make a pipeline work as a workflow, it can be tricky. A pipeline processes messages that can be anything, not just data records. You could have a `Message` that contained both the data and a success/error indicator, to allow your pipeline to process bad records without stopping. You could also have a flag in that `Message` marking the last of the records. As you pass that record down the pipeline, each block could perform some special job. `saveToDb` could be a `TransformBlock` that only emitted something once it received a `LastRecord` message – Panagiotis Kanavos Aug 12 '20 at 14:52
  • Check [Creating a custom data flow block](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-custom-dataflow-block-type) in the docs. `saveToDb` could be a `TransformBlock` that propagated messages after processing them to a custom block. That block could store the messages, the same way the first sliding window example does, and process all of them once the `Last` message arrives – Panagiotis Kanavos Aug 12 '20 at 14:55
  • Or, you could just let your pipeline work as a pipeline (Message is still useful) and execute your final step once `saveToDb` completed. Making `saveToDb` a TransformBlock feeding a `BufferBlock` is perfectly fine, but you still have to wait for `saveToDb`'s completion – Panagiotis Kanavos Aug 12 '20 at 14:58
  • Take a look at this: [How do I get my hands on all the output produced by a TransformBlock while blocking until all inputs have been processed?](https://stackoverflow.com/questions/49389273/for-a-tpl-dataflow-how-do-i-get-my-hands-on-all-the-output-produced-by-a-transf/62410007#62410007). What you have to do is to convert the last step to a `TransformBlock`, and instead of awaiting its completion to await the task returned by the `ToListAsync` method of my answer. – Theodor Zoulias Aug 12 '20 at 15:31

0 Answers0