I have a working pipeline for web scraping. It is downloading the page basing on the URL, parsing the content, then splitting the results into blocks (each x elements is being saved to DB) And it is working fine.
But I would also need one additional step, which will summarize everything that has been done in the pipeline. My current implementation is passing through the pipeline the same object (only adding some values on next steps) so I made some repro code, that should show what I would want to achieve.
Console.WriteLine($"Processing started: {DateTime.Now.ToString()}");
var workBuffer = new BatchBlock<string>(3);
var resultsToFinalize = new List<string>();
var downloadUrl = new TransformBlock<string, string>(url =>
{
Thread.Sleep(int.Parse(url.Last().ToString()) * 1000);
return url;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var parseContent = new TransformBlock<string, string>(content =>
{
Thread.Sleep(int.Parse(content.Last().ToString()) * 1000 / 2);
return $"parsing result for: {content}";
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var saveToDb = new ActionBlock<string[]>(results =>
{
Console.WriteLine($"results: {DateTime.Now.ToString()} {String.Join(", ", results)}");
results.ToList().ForEach(t => resultsToFinalize.Add(t));
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
downloadUrl.LinkTo(parseContent, new DataflowLinkOptions
{
PropagateCompletion = true
});
parseContent.LinkTo(workBuffer, new DataflowLinkOptions
{
PropagateCompletion = true
});
workBuffer.LinkTo(saveToDb, new DataflowLinkOptions
{
PropagateCompletion = true
});
Enumerable.Range(2, 9).Select(t => downloadUrl.Post($"http://some_site_to_parse.com{t}"));
downloadUrl.Complete();
saveToDb.Completion.Wait();
Console.WriteLine(String.Join(Environment.NewLine, resultsToFinalize));
Currently, it is working, because there is external resultsToFinalize
variable to gather all the results.
Probably I should change saveToDb to TransformBlock. But how to cumulate the results till the whole pipeline is ready (the best would be for this additional block to fire after everything (previous block) are finished)
I could, theoretically, create one more BatchBlock before this additional step and set its size to Input size, but this seems a little hacky :)
How could you tackle this issue otherwise?
[UPDATE 12.08.2020 16:47] It seems that it is incomprehensible by some folks - what i want to achieve. So i will make it clear: I've posted some code. I want to have the same output, my code is producing so:
Processing started: 12.08.2020 16:27:46
results: 12.08.2020 16:27:52 parsing result for: http://some_site_to_parse.com2, parsing result for: http://some_site_to_parse.com3, parsing result for: http://some_site_to_parse.com4
results: 12.08.2020 16:27:57 parsing result for: http://some_site_to_parse.com5, parsing result for: http://some_site_to_parse.com6, parsing result for: http://some_site_to_parse.com7
results: 12.08.2020 16:28:00 parsing result for: http://some_site_to_parse.com8, parsing result for: http://some_site_to_parse.com9, parsing result for: http://some_site_to_parse.com10
parsing result for: http://some_site_to_parse.com2
parsing result for: http://some_site_to_parse.com3
parsing result for: http://some_site_to_parse.com4
parsing result for: http://some_site_to_parse.com5
parsing result for: http://some_site_to_parse.com6
parsing result for: http://some_site_to_parse.com7
parsing result for: http://some_site_to_parse.com8
parsing result for: http://some_site_to_parse.com9
parsing result for: http://some_site_to_parse.com10
but without usage of resultsToFinalize (using power of TPL :) )
I would assume that probably saveToDb should be changed from ActionBlock to TransformBlock. And probably there should be some new ActionBlock at the end. The question is how to set it - so that it will fire only ONCE.