0

I need to keep elements in a list/array, so that I can access the last n elements. I.e. I add to it all the time, and should be able to get from it the last elements if needed. (Like a security camera only keeping the last n hours.) I was thinking of having a List or an array, and maintaining an index where the index wraps around.

But this seems like a useful construction so I was wondering if it already exists in .Net. No need to reinvent the wheel if it does.

So does such a thing exist built in to .Net?

EDIT

Apparently, there is a question of how to implement this here. That question and top answers are from more than 10 years ago. That does not answer my question of whether it is built-in today. A lot of what we use today didn't exist then. Also, using Queue won't help because I need to be able to access every element (up to n), not only the last one.

ispiro
  • 26,556
  • 38
  • 136
  • 291
  • Does this answer your question? [Fixed size queue which automatically dequeues old values upon new enques](https://stackoverflow.com/questions/5852863/fixed-size-queue-which-automatically-dequeues-old-values-upon-new-enques) – Bassinator Jul 21 '22 at 20:02
  • 1
    [BoundedChannel](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channel.createbounded?view=net-6.0) is a specialized asynchronous collection for producer/consumer scenarios that can be configured to [drop the newest or oldest](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.boundedchannelfullmode?view=net-6.0) items when full – Panagiotis Kanavos Jul 21 '22 at 20:15
  • As for wrapping, that's called a circular buffer. If you google you'll find a lot of implementations. At its simplest, non-thread-safe form it can be just a fixed-length array that uses modulo arithmetic to determine the next index. Such a thing *doesn't* require a BCL implementation – Panagiotis Kanavos Jul 21 '22 at 20:25
  • If you search NuGet for `circular buffer` you'll get 17 hits. One of the libraries worth looking into is [C5](https://github.com/sestoft/C5) which contains dozens of advanced collections, including a [CircularQueue](https://github.com/sestoft/C5/blob/master/C5/Arrays/CircularQueue.cs) that uses a buffer internally. – Panagiotis Kanavos Jul 21 '22 at 20:47
  • @PanagiotisKanavos BoundedChannel seems like exactly what I was looking for. You can transform your comment into an answer. (I mentioned circular just because I assumed that's how it would be implemented.) – ispiro Jul 21 '22 at 20:57

1 Answers1

1

This describes a circular or ring buffer. Implementing one is relatively easy and googling or searching NuGet returns many implementations, including the CircularQueue in the C5 library.

For asynchronous operations, one can use a BoundedChannel configured with BoundedChannelFullMode.DropOldest:

var options=new BoundedChannelOptions 
    { 
        Capacity = 10,
        FullMode = BoundedChannelFullMode.DropOldest
    };
var channel=Channel.CreateBounded<float>(options);

var writer=channel.Writer;
while(!cancellationToken.IsCancellationRequested)
{
   var value=await getSomeValue(cancellationToken);
   await writer.WriteAsync(value,cancellationToken);
}
writer.Complete();

Channels are specialized asynchronous queues for producer/consumer scenarios so their API and behavior is quite different from normal queues.

  • Writers and readers (producers and consumers) use different APIs to write (ChannelWriter and read ChannelReader. The Channel class itself has no useful members beyond the Reader and Writer properties
  • Writing and reading are explicitly asynchronous.
  • Both write and read order are preserved
  • Channels can be unbounded or bounded. An unbounded channel can grow infinitely, which can be a problem when readers are slower than writers. With Bounded channels, once a channel reaches its upper bound it can block writers (asynchronously) or discard the oldest or newest item.

Channels are used eg in SignalR server-to-client streaming to produce server events. The doc example uses an unbounded owned channel, ie a channel whose lifetime is controlled by the producer method. That could easily be a bounded channel with DropOldest. This would be useful if we want to only send "fresh" events to potentially slow subscribers :

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var options=new BoundedChannelOptions 
    { 
        Capacity = 10,
        FullMode = BoundedChannelFullMode.DropOldest
    };
    var channel=Channel.CreateBounded<int>(options);


    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

If we set capacity to 1 we get Publish Latest behavior :

public ChannelReader<int> CounterLatest(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var options=new BoundedChannelOptions 
    { 
        Capacity = 1,
        FullMode = BoundedChannelFullMode.DropOldest
    };
    var channel=Channel.CreateBounded<int>(options);


    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

A consumer uses the ChannelReader and eg the IAsyncEnumerable<>returned byChannelReader.ReadAllAsync` to read messages:

async Task ConsumeAsync(ChannelReader<int> input,CancellationToken cancellationToken)
{
    await foreach(var msg in input.ReadAllAsync(cancellationToken))
    {
        await DoSomething(msg);
    }
}
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236