I am trying to implement a data processing pipeline using TPL Dataflow
. However, I am relatively new to dataflow and not completely sure how to use it properly for the problem I am trying to solve.
Problem:
I am trying to iterate through the list of files and process each file to read some data and then further process that data. Each file is roughly 700MB
to 1GB
in size. Each file contains JSON
data. In order to process these files in parallel and not run of of memory, I am trying to use IEnumerable<>
with yield return
and then further process the data.
Once I get list of files, I want to process maximum 4-5 files at a time in parallel. My confusion comes from:
- How to use
IEnumerable<>
andyeild return
withasync/await
and dataflow. Came across this answer by svick, but still not sure how to convertIEnumerable<>
toISourceBlock
and then link all blocks together and track completion. - In my case,
producer
will be really fast (going through list of files), butconsumer
will be very slow (processing each file - read data, deserializeJSON
). In this case, how to track completion. - Should I use
LinkTo
feature of datablocks to connect various blocks? or use method such asOutputAvailableAsync()
andReceiveAsync()
to propagate data from one block to another.
Code:
private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;
public Task ProduceAsync()
{
PrepareDataflow(token);
var bufferTask = ListFilesAsync(_fileBufferBlock, token);
var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
return Task.WhenAll(tasks);
}
private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
...
// Get list of file Uris
...
foreach(var fileNameUri in fileNameUris)
await targetBlock.SendAsync(fileNameUri, token);
targetBlock.Complete();
}
private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
var httpClient = new HttpClient();
try
{
using (var stream = await httpClient.GetStreamAsync(fileNameUri))
using (var sr = new StreamReader(stream))
using (var jsonTextReader = new JsonTextReader(sr))
{
while (jsonTextReader.Read())
{
if (jsonTextReader.TokenType == JsonToken.StartObject)
{
try
{
var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
await _messageBufferBlock.SendAsync(data, token);
}
catch (Exception ex)
{
_logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
}
}
}
}
}
catch(Exception ex)
{
// Should throw?
// Or if converted to block then report using Fault() method?
}
finally
{
httpClient.Dispose();
buffer.Complete();
}
}
private void PrepareDataflow(CancellationToken token)
{
_fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
{
CancellationToken = token
});
var actionExecuteOptions = new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = ProcessingSize,
MaxMessagesPerTask = 1,
MaxDegreeOfParallelism = ProcessingSize
};
_processingBlock = new ActionBlock<string>(async fileName =>
{
try
{
await ProcessFileAsync(fileName, token);
}
catch (Exception ex)
{
_logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
// Should fault the block?
}
}, actionExecuteOptions);
_fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });
_messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = 50000
});
_messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}
In the above code, I am not using IEnumerable<DataType>
and yield return
as I cannot use it with async/await
. So I am linking input buffer to ActionBlock<DataType>
which in turn posts to another queue. However by using ActionBlock<>
, I cannot link it to next block for processing and have to manually Post/SendAsync
from ActionBlock<>
to BufferBlock<>
. Also, in this case, not sure, how to track completion.
This code works, but, I am sure there could be better solution then this and I can just link all the block (instead of ActionBlock<DataType>
and then sending messages from it to BufferBlock<DataType>
)
Another option could be to convert IEnumerable<>
to IObservable<>
using Rx
, but again I am not much familiar with Rx
and don't know exactly how to mix TPL Dataflow
and Rx