0

Background: I am trying to solve the problem of embedding a retry policy to a workflow, and I am not particularly satisfied with the solutions I have found so far. So I am posting here a generalized version of this problem, asking for a superior solution, if one exists.

Here is the generalized problem: I have a TransformBlock that applies a lengthy transformation to the items it receives. This transformation includes at least two I/O operations, that should be socially distanced by a specific minimum time delay. During this delay the TransformBlock should be allowed to process other items. In other words this delay should not count towards the MaxDegreeOfParallelism limit of the block. This time span could be considered "dead time" for the processed item, but should not be dead time for the block as a whole. Here is an example of what I am trying to achieve:

TransformBlock<int, string> block = CreateDeadTimeTransformBlock<int, string>(
    async (item, deadTime) =>
{
    Console.WriteLine($"Processing #{item}/A Started");
    await Task.Delay(5000); // Simulate some I/O operation
    Console.WriteLine($"Processing #{item}/A Finished");

    await deadTime.MinimumDelayAsync(TimeSpan.FromSeconds(10));
    //await Task.Delay(TimeSpan.FromSeconds(10)); // Undesirable

    Console.WriteLine($"Processing #{item}/B Started");
    await Task.Delay(5000); // Simulate some I/O operation
    Console.WriteLine($"Processing #{item}/B Finished");
    return item.ToString();
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2
});

In the above example a deadTime object is passed along with the processed item, and the client code can invoke its MinimumDelayAsync method to activate a dead-time interval. So after the two first items have completed their "A" stage and have entered their dead-time period, the processing of the 3rd and the 4th item should be able to start immediately, without having to wait for the completion of the "B" stage of the 1st and 2nd item.

My best attempt so far is to introduce a SemaphoreSlim for controlling the level of concurrency, and disable the MaxDegreeOfParallelism functionality of the TransformBlock by making it unlimited. This creates the problem that the items are acquiring the semaphore in random order, instead of the order that they are posted to the block. I noticed that this problem could be fixed by configuring the block with a TaskScheduler of limited concurrency, equal to the specified MaxDegreeOfParallelism. I am not 100% sure if this is a robust solution to the problem, and neither I am thrilled by the fact that all the items posted to the block are ending up into the semaphore's queue as expensive continuations, instead of staying economically in the input queue of the block.

Another problem of this approach is that it prioritizes processing the "A" phase of new items, instead of the "B" phase of items that are already in progress. This behavior can be partially controlled by configuring the BoundedCapacity option, by setting a limit at how many items can be concurrently in progress in total.

The positive aspects of this solution is that it preserves the whole functionality of the TransformBlock, and especially its EnsureOrdered and BoundedCapacity options.

Am I missing a better solution?

Here is my currently best attempt at implementing the CreateDeadTimeTransformBlock method:

public class DeadTime
{
    private readonly SemaphoreSlim _semaphore;
    private readonly CancellationToken _cancellationToken;

    public DeadTime(SemaphoreSlim semaphore, CancellationToken cancellationToken)
    {
        _semaphore = semaphore;
        _cancellationToken = cancellationToken;
    }

    public async Task MinimumDelayAsync(TimeSpan timeSpan)
    {
        _semaphore.Release();
        try
        {
            await Task.Delay(timeSpan, _cancellationToken);
        }
        finally
        {
            await _semaphore.WaitAsync();
        }
    }
}

public static TransformBlock<TInput, TOutput>
    CreateDeadTimeTransformBlock<TInput, TOutput>(
    Func<TInput, DeadTime, Task<TOutput>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    var cancellationToken = dataflowBlockOptions.CancellationToken;
    var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
    var taskScheduler = dataflowBlockOptions.TaskScheduler;

    SemaphoreSlim semaphore;
    if (maxDOP == DataflowBlockOptions.Unbounded)
    {
        // This is not an interesting case
        semaphore = new SemaphoreSlim(Int32.MaxValue);
    }
    else
    {
        semaphore = new SemaphoreSlim(maxDOP, maxDOP);

        // The degree of parallelism is controlled by the semaphore
        dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;

        // Use a limited-concurrency scheduler for preserving the processing order
        dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
            taskScheduler, maxDOP).ConcurrentScheduler;
    }

    var deadTime = new DeadTime(semaphore, cancellationToken);

    var block = new TransformBlock<TInput, TOutput>(async item =>
    {
        await semaphore.WaitAsync(cancellationToken);
        try
        {
            return await transform(item, deadTime);
        }
        finally
        {
            semaphore.Release();
        }
    }, dataflowBlockOptions);

    dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
    dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value
    return block;
}

Fiddle complete example.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    I think the best you can achieve here is splitting `processA` `deadTime` and `processB` out into separate internal blocks and encapsulating them. It's not exactly what you're looking for but it could work. I put together a sample to demo but not entirely happy with the results. Maybe dataflow just isn't the right hammer for this nail. – JSteward Jun 15 '20 at 22:20
  • Hi @JSteward, thanks for looking at this problem! I have come to a similar conclusion myself, that a single-block approach is too problematic, and encapsulating multiple blocks is probably the way to go. – Theodor Zoulias Jun 16 '20 at 03:33

0 Answers0