2

I am working on a .net core 3.0 web application and have decided to use System.Threading.Channels in a singleton service. The top level of my scoped request services injects this singleton to access its channels.

I have decided to use this pattern to decouple the requests (which produce live updates for other connected clients) from the execution of those updates.

The implementation of ONE channel within an object has many examples out there.

Can anyone tell me if its possible/advisable to use multiple channels within my singleton?

I'm not yet running into any problems creating multiple channels and "starting" them when the singleton is created. I just haven't got to a point where I can test with multiple clients requests hitting different channels on the singleton to see if it works well. (Or at all? ... )

My main motivation for using multiple channels is I want the singleton to do different things based on the type of the item in the channel.

public class MyChannelSingleton 
{
    public Channel<MyType> TypeOneChannel = Channel.CreateUnbounded<MyType>();
    public Channel<MyOtherType> TypeTwoChannel = Channel.CreateUnbounded<MyOtherType>();

    public MyChannelSingleton() 
    {
        StartChannels();
    }

    private void StartChannels() 
    {
        // discarded async tasks due to calling in ctor
        _ = StartTypeOneChannel();
        _ = StartTypeTwoChannel();
    }

    private async Task StartTypeOneChannel()
    {
        var reader = TypeOneChannel.Reader;

        while (await reader.WaitToReadAsync())
        {
            if (reader.TryRead(out MyType item))
            {
                // item is sucessfully read from channel
            }
        }
    }

    private async Task StartTypeTwoChannel()
    {
        var reader = TypeTwoChannel.Reader;

        while (await reader.WaitToReadAsync())
        {
            if (reader.TryRead(out MyOtherType item))
            {
                // item is sucessfully read from channel
            }
        }
    }
}

I also expect to never "Complete" the channels and have them available for the lifetime of the application.

pweber
  • 31
  • 1
  • 2
  • The biggest issue with starting many threads - particular processing ones - is that you might have more running that is usefull. Often you can hand such work off to a Threadpool. | I can not remember having haerd of this classs before, so I have to look at it. – Christopher Nov 11 '19 at 19:30
  • The lack of any example code in there is really discouraging. But it seems to be a `Queue` with some special considerations for Multitasking. Particular FullMode and SingleReader/SingleWriter seem usefull. I need to rememebr that one. – Christopher Nov 11 '19 at 19:39
  • It seems the main issue with the channel is the balance between producer and consumer when you are using unbounded channels. If your consumers are fast enough to process channel data to keep up with the producers its ok to use. – Eldar Nov 11 '19 at 19:42
  • I would think starting multiple channels runs counter to the very purpose of this class. It defaults to SingleReader and Wrtier = false, because you are expected to share it one instance between multiple readers and writers without having to worry about Synchronisation. TryRead and TryWrite will take care of synchronisation for you (if those two properties stay false). | I would guess those functions use a simple locking mechanism. The same kinds I would writ or that the Concurrent Collections have. – Christopher Nov 11 '19 at 19:43
  • I expect to have multiple writers to each of these channels with occasional bursts/peaks of writing in the long run. I need different channels for different types for my use case. Like you, I've looked through the sparse documentation and can't determine if using multiple channels in one object is good or bad... – pweber Nov 11 '19 at 19:56
  • Different channels for different purposes seems entirely fine. The main thing is that you do not try multiple channels for the same kind of purpose. One way to deal with peaks is to pick a high capacity. | The big issues will be balancing the writer and reader parts. You generally want to have as many readers as possible/nessesary, the rest being readers. Unless of course the capacity is fully used up - then you need readers more then writers. Or hgher capacity. Or eventually more server power, inlcuding a cluster. – Christopher Nov 11 '19 at 20:24
  • Okay, I finally figured at what unbounded channels are for. No fixed capacity limit, but may run into a OOM as a result. If you have to deal with peaks and your highest capcity does not work, this seems to be the way to go. If the server still runs out of memory - then it is a hardware problem. | Do consider however that .NET has a hard Limit for Virtual Memory. https://learn.microsoft.com/en-us/windows/win32/memory/memory-limits-for-windows-releases? | My guess is that after that, the GC becomes unable to handle this and you need to use direct memory management. – Christopher Nov 11 '19 at 20:35

3 Answers3

9

You can use as many as you want, provided you use them correctly. In fact, using a background service (essentially a singleton) that exposes a processing pipeline is a very common way to use them in .NET Core.

Channels are not just async queues. They are similar to DataFlow blocks - they can be used to create processing pipelines, with each block/worker processing the data from an input buffer/ChannelReader and forwarding the results to an output buffer/ChannelWriter. DataFlow blocks handle asynchronous processing through tasks themselves. With channels, we need to handle the worker tasks ourselves.

A very important concept we need to keep in mind is that channels aren't accessed directly. In fact, in almost all cases they shouldn't even be exposed as fields or properties. In most cases, only a ChannelReader is needed. In some cases, eg at the head of a pipeline, a ChannelWriter may be exposed. Or not.

Individual workers/steps

A typical worker step would look like this

private ChannelReader<MyType2> Step1(ChannelReader<MyType> reader,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<MyOtherType>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        await foreach(var item from reader.ReadAllAsync(token))
        {
             MyType2 result=........;
             await writer.WriteAsync(result);
        }
    },token).ContinueWith(t=>channel.TryComplete(t));

    return channel.Reader;    
}

Some things to note :

  • You can create multiple tasks if you want and use Task.WhenAll to await for all workers to complete before closing the channel.
  • You can use a bounded channel to prevent a lot of messages accumulating if the pipeline isn't fast enough.
  • If the cancellation gets signalled, both reading from the input channel and the worker task will get cancelled.
  • When the worker task completes, whether because it was cancelled or threw, the channel will be closed.
  • When the "head" channel completes, completion will flow from one step to the next.

Combining steps

Multiple steps can be combined by passing one's output reader to another's input reader, eg :

var cts=new CancelaltionTokenSource();

var step1=Step1(headReader,cts.Token);
var step2=Step2(step1,cts.Token);
var step3=Step3(step2,cts.Token);
...
await stepN.Completion;

The CancellationTokenSource can be used to end the pipeline prematurely or set a timeout as a safeguard against hanged pipelines.

The pipeline head

The "head" reader could come from an "adapter" method like :

private ChannelReader<T> ToChannel(IEnumerable<T> input,CancellationToken token)
{
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    foreach(var item from input)
    {
        if (token.IsCancellationRequested)
        {
            break;
        }
        writer.TryWrite(result);
    }
    //No-one else is going to complete this channel
    channel.Complete();
    return channel.Reader;    
}

In the case of a background service, we could use a service method to "post" input to a head channel, eg :

class MyService
{
    Channel<MyType0> _headChannel;

    public MyService()
    {
        _headChannel=Channel.CreateBounded<MyType0>(5);
    }

    public async Task ExecuteAsync(CancellationToken token)
    {
        var step1=Step1(_headChannel.Reader,token);
        var step2=Step2(step1,token);        
        await step2.Completion;
    }

    public Task PostAsync(MyType0 input)
    {
        return _headChannel.Writer.WriteAsync(input);
    }

    public Stop()
    {
        _headChannel.Writer.TryComplete();
    }

...

}

I'm using method names that look like the BackgroundService method names on purpose. StartAsync or ExecuteAsync can be used to set up the pipeline. StopAsync can be used to signal its completion, eg when the end user hits Ctrl+C.

Another useful technique shown in the queued BackgroundService example is registering an interface that clients can use to post messages instead of accessing the service class directly, eg :

interface IQueuedService<T>
{
    Task PostAsync(T input);
}

Combined with System.Linq.Async

The ReadAllAsync() method returns an IAsyncEnumerable<T> which means we can use operators in System.Linq.Async like Where or Take to filter, batch or transform messages eg :

private ChannelReader<MyType> ActiveOnly(ChannelReader<MyType> reader,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<MyType>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        var inpStream=reader.ReadAllAsync(token)
                            .Where(it=>it.IsActive);
        await foreach(var item from inpStream)
        {
             await writer.WriteAsync(item);
        }
    },token).ContinueWith(t=>channel.TryComplete(t));

    return channel.Reader;    
}
RobC
  • 22,977
  • 20
  • 73
  • 80
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
0

A Channel<T> is just a thread-safe async queue. It doesn't do any processing by itself, it is just a passive in-memory FIFO storage. You can have as many of them as you want.

You could take advantage of the fact that a Channel exposes separately a Reader and a Writer, to limit the access of the clients of your class to the minimum functionality they need. In other words instead of exposing properties of type Channel<T>, you could consider exposing properties of type ChannelWriter<T> or ChannelReader<T>.

Also creating unbounded channels should be done with caution. A single misused channel could make your application a victim of OutOfMemoryException quite easily.

An alternative of exposing properties of type ChannelReader<T> could be exposing IAsyncEnumerable<T>s.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • It doesn't guarantee FIFO. It is a flexible concept. The default channel *might* operate in FIFO, but you could write a channel that had 10 internal queues and satisfied 100% of the concepts and contracts offered by channel – Tim May 15 '21 at 16:32
  • @Tim if the FIFO behavior of the `Channel` class was not guaranteed, this class would be practically useless. In most applications it is vital that you get the items out in the same order that you put them in. I am talking about the `Channel` implementations provided by the built-in [`Channel`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channel) factory class. You can be 100% sure that these implementations have FIFO behavior. – Theodor Zoulias May 15 '21 at 21:40
  • @Tim it is imaginable that in the future new factory methods could be added, like `CreateUnboundedPrioritized(Func selector)`. If this happens I'll update my answer accordingly. But for the time being this discussion is purely academic. – Theodor Zoulias May 15 '21 at 21:49
  • Where is it ever even implied by the docs? FIFO is absolutely not a requirement for Producer Consumer. It isn't useless, I use it all the time. It is *roughly* FIFO, but with multiple writers and multiple readers there is absolutely no FIFO promise at all. If you need strict FIFO just use a Concurrent Queue. It already offers that. – Tim May 15 '21 at 21:50
  • It's not academic. It's a core concept. You can make your own channels. You can use the built in ones. None guarantee FIFO when interacting with multiple threads – Tim May 15 '21 at 21:51
  • The built in channels internally use a Deque and thus are roughly FIFO, and FIFO in single reader single writer scenarios. But beyond that... It's not a guarantee they offer so tomorrow they could use two double ended queues internally or ten, which would break anything relying on pure FIFO – Tim May 15 '21 at 21:55
  • @Tim the concept of FIFO is not applicable in a multithreaded concurrent environment anyway. I am talking about the behavior of the class when used with a single producer and a single consumer. Are you saying that the behavior is not guaranteed to be FIFO in this case, and if someone wants FIFO behavior they shouldn't use the `Channel` class, and use a `ConcurrentQueue` instead? – Theodor Zoulias May 15 '21 at 21:56
  • If someone isn't using multiple threads they can use a regular queue... If they are using multiple threads but need to guarantee FIFO the a Concurrent Queue will do that (with locks basically internally last I looked) or a regular queue with your own locking. Both will get you FIFO. Channels are for more general Producer Consumer scenarios where a non guarantee of perfect order is fine. I'm not saying you won't get FIFO with a Channel with a single reader and writer, but I am saying don't depend on it because the API contact doesn't guarantee it – Tim May 15 '21 at 22:03
  • @Tim as an example I have used [here](https://stackoverflow.com/questions/60375187/how-to-await-the-results-of-an-iasyncenumerabletaskt-with-a-specific-level/60375188#60375188) a `Channel` instance in the implementation of a method that converts an `IAsyncEnumerable>` to a `IAsyncEnumerable`, where it is absolutely essential that the order of the input values is preserved in the resulting sequence. Are you saying that my implementation is flawed because it relies on a non-documented behavior that can be subject to future changes? – Theodor Zoulias May 15 '21 at 22:13
-2

Unfortunately I can not find the sourcecode. And calling the Documentation sparse would be a understatement. So I can at best tell you "if it was my class, how I would do it".

The big issue with having multiple channels in memory - particular unbounded - would be memory fragmentation causing a early OOM. Indeed with even one unbounded, a big issue would be having to grow the collection. List<T> is little more then a wrapper around a T[] with some automatic growth support. Another issue with unbounded lists, is that sooner or later you run out of indexes.

How would I solve this? A Linked List. In about 90% of all cases, a Linked List would be the last collection I would even consider. The remaining 10% are Queues and Queue like constructs. And channels look very much like a Queue. Of those 10% cases, in 9% I would just use whatever the Queue implementation does. This is the remaining 1%.

For random access the Linked List is the worst possible collection. For queues it is doable. But at avoiding Fragmentation related OOMs in .NET? For minimising the cost of growth? For getting around the the hard array limit? There the Linked List is absolutely unbeatable.

And if it does not do that? It should be doable to make your own Version of channel that does do that and just replace it.

Christopher
  • 9,634
  • 2
  • 17
  • 31
  • Channels aren't buffers (and have no reallocation issues), linked lists are more expensive for every operation compared to lists and far worse, they aren't thread safe. A ConcurrentQueue would be better but that doesn't provide async reading writing. The source code is on Github, not referencesource – Panagiotis Kanavos Nov 20 '19 at 11:35
  • BTW lists are cheaper at insertion than linked lists because insertion costs 0 until you have to resize the list. Something that can easily be avoided by just specifying a capacity. It doesn't even have to be accurate. Linked lists on the other hand have to allocate another node for every insertion. – Panagiotis Kanavos Nov 20 '19 at 11:37
  • @PanagiotisKanavos I do not see me calling Channel a Buffer. I compared it to a queue, wich is the closest equivalent. I am at best uncertain if a ConcurrentQueue would be better. | Lists are only cheaper during insertion if they do not get to long. While growth happens rarely, the cost for any growth operation scales linerary to how full it already is. That little impact of having to look up and re-assign a reference is reasily on par with having to copy 100 elements to the newly grown list. And we were talking *explicitly* about a unbounded Channel. So baby is gonna grow big. – Christopher Nov 20 '19 at 15:31
  • The OOM issue applies only to List, which uses an array as a buffer. Channels don't use buffers and hence have no such issues. And the actual benchmarks show that linked lists are a lot more expensive than the absolute *0* cost of setting a value in an array, especially once you take into account the cost of allocations and garbage collection. Never mind CPU caching and locality issues that make using a buffer a *lot* faster than having to go back to main memory to find the next node. – Panagiotis Kanavos Nov 20 '19 at 15:35
  • `I am at best uncertain if a ConcurrentQueue would be better`. Infinitely better. Linked lists aren't thread safe, period. Concurrent queues are. – Panagiotis Kanavos Nov 20 '19 at 15:36
  • That this answer is wrong and not even relevant to the question - linked lists aren't thread safe. The source code is available in the CoreFX repo on Github along with the entire .NET Core source – Panagiotis Kanavos Nov 20 '19 at 15:45
  • @PanagiotisKanavos: "linked lists aren't thread safe" Wich is why I **never said anything even close to that**. The mechanism for Thread Safety was irrelevant for this discussion. It was *entirely* about the memory useage and similar issues during high request cases, with multiple Channels. | TL;DR: You disagree with stuff I never said, never implied and honestly can not figure out how you read it into my post. – Christopher Nov 20 '19 at 17:03