There are several issues with this code that limit its scalability.
- it performs both IO and CPU work in the same task without using asynchronous methods. The number of CPU-bound tasks you can perform at the same time is limited by the number of cores. You can perform a lot more async tasks than that.
- IO is blocking, not asynchronous. This means the task (or rather its thread) waits doing nothing while waiting for the OS to retrieve the data.
- The code reads too much data and generates too many temporary objects.
ReadAllText
reads the entire file when only 1000 characters are needed. Strings are immutable so html.Substring(1,1000)
generates a new substring. All these take up memory and have to be garbage-collected at some point.
ConcurrentBag
isn't a general-use concurrent collection like ConcurrentQueue or ConcurrentDictionary. It uses thread-local storage to ensure the thread that created an item can retrieve it faster than other threads.
.NET offers several high-level classes that can be used to construct parsing pipelines far more complex than loading, parsing and importing files. These include Dataflow blocks, Channels and Async Streams/IAsyncEnumerable.
One way to improve the question's code would be to use Dataflow blocks to enumerate the root folder, load file contents and parse it in different blocks with different Degree-Of-Parallelism.
First, the crawling, loading and parsing code can be extracted to separate methods :
record Search(DirectoryInfo root,string pattern);
record Sample(FileInfo file,string sample);
record SampleHtml(FileInfo file,HtmlDocument html);
IEnumerable<FileInfo> Crawl(Search search)
{
var (root,pattern)=search;
var searchOptions=new EnumerationOptions {
RecurseSubdirectories=true,
IgnoreInaccessible=true
};
return root.EnumerateFiles(pattern,searchOptions);
}
async Task<Sample> ReadSample(FileInfo file,int length)
{
var buffer=new char[length+1];
using var reader=file.OpenText();
var count=await reader.ReadBlockAsync(buffer,length+1);
var html= new String(buffer,1,count-1);
return new Sample(file,html);
}
SampleHtml ParseSample(Sample sample)
{
var html=new HtmlDocument();
html.LoadHtml(sample.sample);
return new SampleHtml(sample.file,html);
}
Dataflow blocks can be used to create a pipeline of:
- A single-threaded file search block
- A loader block loading 2 files at a time
- A parser block parsing 4 samples at a time
- A result BufferBlock collecting the output of the parser
var loadOptions=new ExecutionDataflowBlockOptions{
MaxDegreeOfParallelism=2,
BoundedCapacity=1
};
var parseOptions=new ExecutionDataflowBlockOptions{
MaxDegreeOfParallelism=4,
BoundedCapacity=1
};
var crawler=new TransformManyBlock<Search,FileInfo>(search=>
Crawl(search);
var loader = new TransformBlock<FileInfo,Sample>(file=>
ReadSample(file,1000),loadOptions);
var parserBlock=new TransformBlock<Sample,SampleHtml>(sample=>
ParseHtml(sample),parseOptions);
var results=new BufferBlock<SampleHtml>();
var linkOptions=new DataflowLinkOptions {
PropagateCompletion = true
};
crawler.LinkTo(loader,linkOptions);
loader.LinkTo(parser,linkOptions);
//Don't propagate completion, we just cache results here
parser.Linkto(results);
To use the pipeline, we post a search specification to the head block, crawler
and await until the final block, parser, has completed processing everything
var root=new DirectoryInfo(path_to_root);
var pattern="*.html";
await crawler.SendAsync(new Search(root,pattern));
crawler.Complete();
await parser.Completion;
A this point results
contains all results. We can use TryReceive
to pop items one by one or TryReceiveAll to read everything into a container :
if(results.TryReceiveAll(out var docs)
{
var last=docs[^1];
}
The loader
and parser
block have a BoundedCapacity
of 1. This means their input buffer will only accept a single item beyond those being processed. Any upstream blocks will have to wait before posting new items, all the way up to the crawler. This prevents filling the memory with objects that can't be processed fast enough.
Reusing buffers
The ArrayPool class can provide reusable buffers and thus avoid creating a new char[1001]
buffer for each file. With a loader DOP of 4, this means we only need 4 buffers instead of 3000 buffers:
async Task<Sample> ReadSample(FileInfo file,int length)
{
var buffer=ArrayPool<char>.Shared.Rent(length+1);
try
{
using var reader=file.OpenText();
var count=await reader.ReadBlockAsync(buffer,0,length+1);
var html= new String(buffer,1,count-1);
return new Sample(file,html);
}
finally
{
ArrayPool<char>.Shared.Return(buffer);
}
}
This leaves the 3000 1000-char string objects. These can be eliminated if the loader and parser are modified to pass byte[]
buffers instead of strings. HtmlAgilityPack's Load
can read from any Stream
or StreamReader
which means it can also load from a MemoryStream wrapping a buffer.
The only problem is that UTF8 uses a variable number of bytes per character so it's not possible to guess how many bytes are needed to read exactly 1000 characters in advance. If the HTML files are expected to contain a lot of non-English text ReadSample
's length will have to be increased.
record Sample(FileInfo file,byte[] buffer,int count);
async Task<Sample> ReadSample(FileInfo file,int length)
{
var buffer=ArrayPool<char>.Shared.Rent(length+1);
using var reader=file.OpenText();
var count=await reader.ReadBlockAsync(buffer,0,length+1);
return new Sample(file,buffer,count);
}
SampleHtml ParseSample(Sample sample)
{
try
{
var html=new HtmlDocument();
using var ms=new MemoryStream(sample.buffer,1,sample.count);
html.Load(ms);
return new SampleHtml(sample.file,html);
}
finally
{
ArrayPool<char>.Shared.Return(sample.buffer);
}
}