Questions tagged [system.threading.channels]

Provides a set of highly scalable, low/no-allocation synchronization data structures for passing data between producers and consumers asynchronously.


System.Threading.Channels contains the abstractions and base implementation for async friendly queues as a replacement for synchronous thread-safe BlockingCollection and ConcurrentQueue.

It's also a partial replacement for the System.Threading.Tasks.Dataflow async friendly queues (which is implicitly implemented inside BufferBlock and other blocks).

The API Proposal is probably the only official documentation at the moment.

Exploring System.Threading.Channels offers a good overview of channels

Decoupling

Publishers and subscribers can only access a channel through a ChannelWriter and ChannelReader respectively. This decouples publishers, subscribers and the channel implementation itself.

Backpressure

A Bounded Channel allows only a specific number of items. Its behaviour when full depends on the BoundedChannelFullMode option. By default, publishers will have to wait asynchronously if a channel becomes full.

Other values of the BoundedChannelFullMode enum allow different behaviours. For example, DropOldest can be used to create a circular buffer. DropWrite can be used to throttle publishers by rejecting overflowing messages.

65 questions
8
votes
1 answer

Does Channel support multiple ChannelReaders and ChannelWriters, or only one of each?

The documentation for Channel.CreateUnbounded says: Creates an unbounded channel usable by any number of readers and writers concurrently. However Channel has properties for a single ChannelReader and ChannelWriter only, and there doesn't appear…
Mr. Boy
  • 60,845
  • 93
  • 320
  • 589
7
votes
0 answers

Difference between a ChannelReader and IAsyncEnumerable

Ill keep this one short After reading the article "Use streaming in ASP.NET Core SignalR" (first part, server to client streaming), what is the difference between a ChannelReader[T] and IAsyncEnumerable[T]?
Cowborg
  • 2,645
  • 3
  • 34
  • 51
6
votes
1 answer

How to correctly dispose a client stream if the connection has disconnected?

I'm using Microsoft.AspNetCore.SignalR 2.1 v1.0.4 and have a ChannelReader stream being consumed by a typescript client using v1.0.4. The channel surfaces event data specific to a single entity, so it's expected the client would subscribe to a…
Mark
  • 1,059
  • 13
  • 25
4
votes
2 answers

Parallel receiving data from several IAsyncEnumerable streams

I have a case when I need to receive data from more than one IAsyncEnumerable source. For performance benefit it should be performed in parallel manner. I have written such code to achieve this goal using AsyncAwaitBestPractices,…
4
votes
2 answers

Does ChannelReader.ReadAllAsync throw any exceptions when being canceled by a CancellationToken?

Does ChannelReader.ReadAllAsync throw any exceptions when being canceled by a CancellationToken? It doesn't seem to be throwing OperationCanceledException/TaskCanceledException? I know if these two methods were called in a fire and forget manner,…
nop
  • 4,711
  • 6
  • 32
  • 93
4
votes
1 answer

How to read remaining items in Channel less than batch size, if there is no new items coming to channel within X minutes?

I am using Channel from System.Threading.Channels and wants to read items in batch (5 items) and I have a method like below, public class Batcher { private readonly Channel _channel; public Batcher() { …
user584018
  • 10,186
  • 15
  • 74
  • 160
3
votes
1 answer

Channels: Is it possible to broadcast/receive same message by multiple consumers from single producer?

Is it possible to receive same message by multiple consumers. I have a single producer which produces Tick data(stock market) from web sockets. I have single consumer now receives 1000 messages per second, it works great. But now I would like to…
3
votes
2 answers

Multiple consumers without losing messages

I have a firehose of stuff coming through a redis pub/sub and I need to distribute it to a number of websocket connections, so basically whenever a message comes from redis, it needs to be distributed through all websockets connections. I want…
nop
  • 4,711
  • 6
  • 32
  • 93
3
votes
2 answers

ChannelReader.ReadAllAsync(CancellationToken) not actually cancelled mid-iteration

I've been working on a feature that queues time consuming work in a channel, and there I iterate the channel using e.g. await foreach(var item in channel.Reader.ReadAllAsync(cancellationToken)) {...} I was expecting that when cancellation is…
3
votes
1 answer

Pipelines buffer preserving until processing is complete

I am researching the possibility of using pipelines for processing binary messages coming from network. The binary messages i will be processing come with an payload and it is desirable to keep the payload in its binary form. The idea is to read out…
NullReference
  • 862
  • 1
  • 11
  • 27
3
votes
1 answer

How to avoid InvalidOperationException was thrown when ChannelReader.WaitToReadAsync?

I wrote the asynchronous queue using System.Threading.Channels. but when I ran the program for testing, following exception was thrown at random time and worker thread was stopped. System.InvalidOperationException: The asynchronous operation has not…
3
votes
0 answers

Using System.Threading.Channels with different TRead/TWrite

Channels are instantiated using the static methods CreateUnbounded or CreateBounded. The created channel will accept writes and emit reads of TType. In my use case I want a channel that writes TType but emits objects. It looks like…
pardahlman
  • 1,394
  • 10
  • 22
2
votes
1 answer

Avoid consumers processing the same entry

Cant prevent multiple consumer from processing the same record in queue using System.Threading.Channels library, when writer enqueues some model, one of multiple consumers starts to process it. during this, writer goes to the database and reads the…
2
votes
2 answers

Which is the fastest way to tell if a Channel is empty?

I am consuming a Channel in an await foreach loop, and on each iteration I want to know if the channel is empty. Channels don't have an IsEmpty property, so I am seeing two ways to get this information: the Count property and the TryPeek…
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
2
votes
1 answer

.NET Problem using System.Threading.Channels.Channel efficiently

Consider the following code: var channel = Channel.CreateUnbounded(new UnboundedChannelOptions()); var t1 = Task.Run(async () => { DateTime start = DateTime.Now; for (int i = 0; i < 100000000; i++) { await…
rory.ap
  • 34,009
  • 10
  • 83
  • 174
1
2 3 4 5