Background: I am trying to solve the problem of embedding a retry policy to a workflow, and I am not particularly satisfied with the solutions I have found so far. So I am posting here a generalized version of this problem, asking for a superior solution, if one exists.
Here is the generalized problem: I have a TransformBlock
that applies a lengthy transformation to the items it receives. This transformation includes at least two I/O operations, that should be socially distanced by a specific minimum time delay. During this delay the TransformBlock
should be allowed to process other items. In other words this delay should not count towards the MaxDegreeOfParallelism
limit of the block. This time span could be considered "dead time" for the processed item, but should not be dead time for the block as a whole. Here is an example of what I am trying to achieve:
TransformBlock<int, string> block = CreateDeadTimeTransformBlock<int, string>(
async (item, deadTime) =>
{
Console.WriteLine($"Processing #{item}/A Started");
await Task.Delay(5000); // Simulate some I/O operation
Console.WriteLine($"Processing #{item}/A Finished");
await deadTime.MinimumDelayAsync(TimeSpan.FromSeconds(10));
//await Task.Delay(TimeSpan.FromSeconds(10)); // Undesirable
Console.WriteLine($"Processing #{item}/B Started");
await Task.Delay(5000); // Simulate some I/O operation
Console.WriteLine($"Processing #{item}/B Finished");
return item.ToString();
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 2
});
In the above example a deadTime
object is passed along with the processed item, and the client code can invoke its MinimumDelayAsync
method to activate a dead-time interval. So after the two first items have completed their "A" stage and have entered their dead-time period, the processing of the 3rd and the 4th item should be able to start immediately, without having to wait for the completion of the "B" stage of the 1st and 2nd item.
My best attempt so far is to introduce a SemaphoreSlim
for controlling the level of concurrency, and disable the MaxDegreeOfParallelism
functionality of the TransformBlock
by making it unlimited. This creates the problem that the items are acquiring the semaphore in random order, instead of the order that they are posted to the block. I noticed that this problem could be fixed by configuring the block with a TaskScheduler
of limited concurrency, equal to the specified MaxDegreeOfParallelism
. I am not 100% sure if this is a robust solution to the problem, and neither I am thrilled by the fact that all the items posted to the block are ending up into the semaphore's queue as expensive continuations, instead of staying economically in the input queue of the block.
Another problem of this approach is that it prioritizes processing the "A" phase of new items, instead of the "B" phase of items that are already in progress. This behavior can be partially controlled by configuring the BoundedCapacity
option, by setting a limit at how many items can be concurrently in progress in total.
The positive aspects of this solution is that it preserves the whole functionality of the TransformBlock
, and especially its EnsureOrdered
and BoundedCapacity
options.
Am I missing a better solution?
Here is my currently best attempt at implementing the CreateDeadTimeTransformBlock
method:
public class DeadTime
{
private readonly SemaphoreSlim _semaphore;
private readonly CancellationToken _cancellationToken;
public DeadTime(SemaphoreSlim semaphore, CancellationToken cancellationToken)
{
_semaphore = semaphore;
_cancellationToken = cancellationToken;
}
public async Task MinimumDelayAsync(TimeSpan timeSpan)
{
_semaphore.Release();
try
{
await Task.Delay(timeSpan, _cancellationToken);
}
finally
{
await _semaphore.WaitAsync();
}
}
}
public static TransformBlock<TInput, TOutput>
CreateDeadTimeTransformBlock<TInput, TOutput>(
Func<TInput, DeadTime, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
var cancellationToken = dataflowBlockOptions.CancellationToken;
var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
var taskScheduler = dataflowBlockOptions.TaskScheduler;
SemaphoreSlim semaphore;
if (maxDOP == DataflowBlockOptions.Unbounded)
{
// This is not an interesting case
semaphore = new SemaphoreSlim(Int32.MaxValue);
}
else
{
semaphore = new SemaphoreSlim(maxDOP, maxDOP);
// The degree of parallelism is controlled by the semaphore
dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;
// Use a limited-concurrency scheduler for preserving the processing order
dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
taskScheduler, maxDOP).ConcurrentScheduler;
}
var deadTime = new DeadTime(semaphore, cancellationToken);
var block = new TransformBlock<TInput, TOutput>(async item =>
{
await semaphore.WaitAsync(cancellationToken);
try
{
return await transform(item, deadTime);
}
finally
{
semaphore.Release();
}
}, dataflowBlockOptions);
dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value
return block;
}
Fiddle complete example.