1

I have an app that receives a stream of XML events from Kafka. These events have to be deserialized/parsed and otherwise converted, before being handed in-order to some business logic. (This logic then on emits other events on the output side).

The parsing/conversion code is stateless, while the domain code is stateful and has to receive events in order. These two steps are decoupled through use of a System.Threading Channel so that parsing step gets a full 'thread'/'cpu' (async task).

My challenge is that the parsing is CPU heavy, and it hits a 100% CPU on one core thereby being the bottleneck for service throughput. I've tried to use multi-threading / parallel processing, and this has improved the throughput somewhat. However my approach seems un-elegant, and potentially with a lot of overhead.

In the parsing step I've used Task.Run() to spawn a Task for each 'item', and then added the Task to the output queue ensuring the Tasks are added according to input order. The consumer then pulls tasks from the Channel one at a time, and waits for it to complete with a result before continuing.

This means I'm creating and submitting a large amount of Tasks, and in general seems like I'm using a lot of thread coordinating operations in the hot-path.

Was hoping someone here would have a good approach for processing items in order while respecting the ordering of the output.

Jannick
  • 47
  • 5
  • 1
    When you use `Task.Run`, the work is dispatched to the _Thread Pool_. It maintains a set of threads to which it, in turn, dispatches the work. When work arrives at the thread pool, it grabs an idle pooled thread, says "do this" and lets the thread do its thing. When the work completes, the thread goes back into the pool, waiting until more work comes its way. It's a pretty efficient process. By the way, your question could be a lot better if you posted some of your code. Also be aware that this site doesn't like _opinion-based_ questions (so "Good way to...") tends to attract close votes – Flydog57 Mar 22 '23 at 20:25
  • Is there something in the XML itself that identifies the order (such as a timestamp or sequence number) and is that identifier also present in the transformed XML? – Eric J. Mar 22 '23 at 20:33
  • There is no trustworthy ordering field, but I could definitely introduce an artificial sequence number and ‘wrap’ the event object with the extra info. – Jannick Mar 22 '23 at 21:13
  • Thanks for feedback Flydog57. I dont think this is a matter of opinion though. I have most of my experience in the JVM world, and there my solution would definitely not be acceptable from a performance/efficiency standpoint. I’m just not able to find data structures or abstractions in .NET to do this right. Digging into TPL Dataflow again to see if I missed something. – Jannick Mar 22 '23 at 21:18
  • You could check [Reactivex](https://github.com/dotnet/reactive) – WerWet Mar 22 '23 at 21:59

2 Answers2

1

So you have a Channel<Task<T>> as a conveyor belt, the producer adds tasks with channel.Writer.TryWrite(Task.Run(() => Parse(item))), and the consumer reads the tasks and awaits them the one after the other:

await foreach (Task<T> task in channel.Reader.ReadAllAsync())
{
    T result = await task;
    // Do something with the result
}

This is a quite good setup. A disadvantage is that you are not controlling the degree of parallelism. So at some moments you might have too many Task.Run actions running in parallel, resulting in ThreadPool starvation, that might affect negatively other parts of your application. You can solve this problem by scheduling the work with the more advanced Task.Factory.StartNew instead of the Task.Run, and configure the scheduler argument with the ConcurrentScheduler property of a shared ConcurrentExclusiveSchedulerPair instance.

Another approach is to replace the channel with a TransformBlock<TInput,TOutput> from the TPL Dataflow library. This component combines an input buffer, an output buffer, and a processor that transforms the TInput to TOutput. It is equipped out of the box with parallel capabilities and order preservation. Here is an example:

TransformBlock<Item, Result> block = new(item =>
{
    return Parse(item);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2, // Configurable, the default is 1
    EnsureOrdered = true, // This is the default
});

The producer feeds the block with block.Post(item), and the consumer enumerates the output buffer of the block with the ReceiveAllAsync method:

await foreach (var result in block.ReceiveAllAsync())
{
    // Do something with the result
}
await block.Completion;

The await block.Completion; at the end is needed because the ReceiveAllAsync method currently has a bug, and doesn't propagate possible exceptions as part of the enumeration.

My expectation is that the TransformBlock approach should have less overhead, and consume less memory than your current setup. The TPL Dataflow library is advertised by Microsoft as suitable for "coarse-grained dataflow and pipelining tasks". This means that your Parse method should be chunky. In case it is feather-weight, like parsing a single number, most likely the benefits of parallelization will be negated by the synchronization overhead. In that case the solution might be to chunkify the work using a BatchBlock<T>.

The TPL Dataflow library is not exactly cutting edge technology. It predates ValueTasks and so it doesn't take advantage of them. It also comes with some quirks, like swallowing OperationCanceledExceptions that might be thrown by the transform delegate. It is also very difficult to extend. Although it should be better than what you have already, it's not the absolutely optimal solution, but it might be good enough for your needs.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Thank you, will try out both options. (And thanks for elegant condensed example of my setup) – Jannick Mar 23 '23 at 07:50
  • @Jannick you can find examples of using the `ConcurrentExclusiveSchedulerPair` for throttling purposes [here](https://stackoverflow.com/questions/60893389/how-to-parallelize-multiple-heterogeneous-jobs-with-a-specific-degree-of-paralle/60896477#60896477) or [here](https://stackoverflow.com/questions/75112527/how-can-i-use-parallel-tasks-and-make-them-not-waiting-each-other/75115452#75115452). – Theodor Zoulias Mar 23 '23 at 12:03
1

I understand the requirement to be:

  • You read from a stream of data representing XML events that must be transformed.
  • You hand off each XML event to one of multiple tasks to perform transformation.
  • You wish to consume the transformed stream in the original order

This Resequencer class is designed to do just that by maintaining a queue of sequence numbers (or other ordering criteria) as they come in from the producer (source of XML events in your case), buffering results from the consumers (XML transformers) in a dictionary, and providing a method to read the results in order. The number of elements in the buffer won't exceed the number of consumer tasks you use.

This is a simple demo that can be enhanced in many ways including more robust error handling, and it could implement IEnumerable<TData>.

The implementation checks the dictionary for the next key in sequence immediately and, if that fails, once again each time one of the consumers completes.

/// <summary>
/// This class assumes input is registered by a single producer processing data in
/// the desired order. It supports multiple consumers performing the processing
/// </summary>
public class Resequencer<TKey, TData> where TKey : notnull where TData : class
{
    bool producerDone = false;
    
    readonly ManualResetEventSlim _resetEvent = new ManualResetEventSlim();
    
    readonly ConcurrentQueue<TKey> _identifiersInOrder = new ConcurrentQueue<TKey>();
    readonly ConcurrentDictionary<TKey, TData> _processedData = 
        new ConcurrentDictionary<TKey, TData>();

    public void RegisterNextInputKey(TKey key) => 
        _identifiersInOrder.Enqueue(key);
    
    public void ProducerIsDone()
    {
        producerDone = true;
    }
    public void RegisterConsumerOutput(TKey key, TData data)
    {
        _processedData.TryAdd(key, data);
        _resetEvent.Set();
        
    }
    public TData? GetNext()
    {
        do
        {                       
            if (_identifiersInOrder.TryDequeue(out var nextUp))
            {
                while (true)
                {
                    if (_processedData.TryGetValue(nextUp, out var data))
                    {
                        return data;
                    }

                    _resetEvent.Wait();
                }
            }

        }
        while (!producerDone || _identifiersInOrder.Any());
        
        return null;
    }
}

Here is a demonstration that builds upon the Channel demonstration code in this article.

static readonly Resequencer<int, Output> _resequencer = new Resequencer<int, Output>();

async Task Main()
{
    var channel = Channel.CreateUnbounded<Input>();

    // In this example, multiple consumers are needed to keep up with a fast producer

    Task outputTask = Task.Run(() => OutputOrderedResult());

    var producer1 = new Producer(channel.Writer, 1);
    var consumer1 = new Consumer(channel.Reader, 1);
    var consumer2 = new Consumer(channel.Reader, 2);
    var consumer3 = new Consumer(channel.Reader, 3);

    Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
    Task consumerTask2 = consumer2.ConsumeData(); // begin consuming
    Task consumerTask3 = consumer3.ConsumeData(); // begin consuming

    Task producerTask1 = producer1.BeginProducing();

    await producerTask1.ContinueWith(_ => channel.Writer.Complete());
    

    await Task.WhenAll(outputTask, consumerTask1, consumerTask2, consumerTask3);
    
    Console.WriteLine($"Max queue size: {_resequencer.maxQueueSize}");
}

void OutputOrderedResult()
{
    while (true)
    {
        var next = _resequencer.GetNext();
        if (next == null)
            break;
        Console.WriteLine($"SequenceNr: {next.SequenceNr} Data: {next.TransformedData}");
    }
}

internal class Input
{
    public int SequenceNr { get; set; }
    public string OriginalData { get; set; } = string.Empty;
}

internal class Output
{
    public int SequenceNr { get; set; }
    public string TransformedData { get; set; } = string.Empty;
}

// These classes based on https://github.com/stevejgordon/ChannelSample
// See also https://www.stevejgordon.co.uk/an-introduction-to-system-threading-channels
// Modifications relevant to this post commented in ALL CAPS
internal class Producer
{
    private readonly ChannelWriter<Input> _writer;
    private readonly int _identifier;

    public Producer(ChannelWriter<Input> writer, int identifier)
    {
        _writer = writer;
        _identifier = identifier;
    }

    public async Task BeginProducing()
    {
        for (var i = 0; i < 1000; i++)
        {
            await Task.Delay(50); // simulate producer building/fetching some data

            var input = new Input()
            {
                SequenceNr = i,
                OriginalData = $"Item {i}"
            };

            _resequencer.RegisterNextInputKey(input.SequenceNr);
            await _writer.WriteAsync(input);
        }

        _resequencer.ProducerIsDone();
    }
}

internal class Consumer
{
    private readonly ChannelReader<Input> _reader;
    private readonly int _identifier;
    private readonly Random rnd = new Random();

    public Consumer(ChannelReader<Input> reader, int identifier)
    {
        _reader = reader;
        _identifier = identifier;
    }

    public async Task ConsumeData()
    {
        while (await _reader.WaitToReadAsync())
        {
            if (_reader.TryRead(out var input))
            {
                await Task.Delay(rnd.Next(50, 200)); // simulate processing time
                var output = new Output() { SequenceNr = input.SequenceNr, TransformedData = new string(input.OriginalData.Reverse().ToArray()) };
                _resequencer.RegisterConsumerOutput(output.SequenceNr, output);
            }
        }
    }
}
Eric J.
  • 147,927
  • 63
  • 340
  • 553