5

Suppose I have some IEnumerator<T> which does a fair amount of processing inside the MoveNext() method.

The code consuming from that enumerator does not just consume as fast as data is available, but occasionally waits (the specifics of which are irrelevant to my question) in order to synchronize the time when it needs to resume consumption. But when it does the next call to MoveNext(), it needs the data as fast as possible.

One way would be to pre-consume the whole stream into some list or array structure for instant enumeration. That would be a waste of memory however, as at any single point in time, only one item is in use, and it would be prohibitive in cases where the whole data does not fit into memory.

So is there something generic in .net that wraps an enumerator / enumerable in a way that it asynchronously pre-iterates the underlying enumerator a couple of items in advance and buffers the results so that it always has a number of items available in its buffer and the calling MoveNext will never have to wait? Obviously items consumed, i.e. iterated over by a subsequent MoveNext from the caller, would be removed from the buffer.

N.B. Part of what I'm trying to do is also called Backpressure, and, in the Rx world, has already been implemented in RxJava and is under discussion in Rx.NET. Rx (observables that push data) can be considered the opposite approach of enumerators (enumerators allow pulling of data). Backpressure is relatively easy in the pulling approach, as my answer shows: Just pause consumption. It's harder when pushing, requiring an additional feedback mechanism.

Evgeniy Berezovsky
  • 18,571
  • 13
  • 82
  • 156
  • This looks like a duplicate of your earlier question, which you said you would edit: http://stackoverflow.com/questions/30700154/a-pre-buffering-enumerator – Asad Saeeduddin Jun 08 '15 at 15:45
  • @Asad I deleted the old question and created this one instead, because the comments to the existing one do not match the question in its current form at all. This one I hope makes it more clear what I actually want. – Evgeniy Berezovsky Jun 08 '15 at 23:03
  • The question doesn't appear to have changed noticeably. You still have the problem that pre-buffering a couple of items will just result in a delay twice as long half as frequently when calling MoveNext once the consumer exhausts the buffer. – Asad Saeeduddin Jun 08 '15 at 23:59
  • The consumer does in general not exhaust the buffer, that in fact *is* the important qualification missing in the first version of the question. To wit: `The code consuming from that enumerator does not just consume as fast as data is available, but occasionally waits [..] But when it does the next call to MoveNext(), it needs the data as fast as possible.` – Evgeniy Berezovsky Jun 09 '15 at 00:20
  • Ok, but unless the wait in the consumer is perfectly synchronized with the buffer size, the consumer will not get the data "as fast as possible". It will still have laggy iteration on top of what its own delay is. – Asad Saeeduddin Jun 09 '15 at 01:15
  • 1
    @Asad Re your last comment: Your (and my solution) have a fixed buffer size, which in fact is sufficient for my current problem, but it does not have to be fixed. The pre-buffering code could e.g. adapt to a consumer that threatens to completely exhaust the buffer when it consumes at a fast pace for a long time, by increasing the size of the buffer. Something that would be hard to do with a BlockingCollection (with its fixed bound), but could be useful for more dynamic use cases, and remove the need from the consumer to commit to a fixed size. – Evgeniy Berezovsky Jun 09 '15 at 03:21

2 Answers2

5

A more concise alternative to your custom enumerable class is to do this:

public static IEnumerable<T> Buffer<T>(this IEnumerable<T> source, int bufferSize)
{
    var queue = new BlockingCollection<T>(bufferSize);

    Task.Run(() => {
        foreach(var i in source) queue.Add(i);
        queue.CompleteAdding();
    });

    return queue.GetConsumingEnumerable();
}

This can be used as:

var slowEnumerable = GetMySlowEnumerable();
var buffered = slowEnumerable.Buffer(10); // Populates up to 10 items on a background thread
Asad Saeeduddin
  • 46,193
  • 6
  • 90
  • 139
  • 2
    Good solution. I knew and have used BlockingCollection (and stopped using it in highly concurrent code that needs to be fast because [it is slow compared to ConcurrentQueue+AutoResetEvent](http://stackoverflow.com/a/29269149/709537)), but I didn't know about `GetConsumingEnumerable` and in this simple producer/consumer scenario it looks like the perfect solution. So I don't have to roll my own after all. – Evgeniy Berezovsky Jun 09 '15 at 01:46
  • @EugeneBeresovsky :) . It's just that I feel the optimal buffer size here is whatever you ultimately plan on consuming (i.e. buffer everything, instead of a short distance ahead). I mean, if you're spawning another thread anyway, let it keep working and prepare as far ahead as it can. – Asad Saeeduddin Jun 09 '15 at 01:50
  • @EugeneBeresovsky Wasn't aware BlockingCollection was slow, but you could probably use a `ConcurrentQueue`, and lock synchronize a size check with the `Add`. – Asad Saeeduddin Jun 09 '15 at 01:53
  • 1
    Not sure why you assume the consumer cannot figure out an appropriate buffer size. I have up to 10 of these enumerators in my app, every one of which creates up to 10GB of data. This is data that gets replayed at a certain speed (and can be paused by the user etc). Calculating the necessary buffer size depends an replay speed and items/sec and is thus straight forward. The benefit you fail to see is a) immensely reduced memory consumption (compared to loading it all in at the same time) and b) reduced latency (when reading in on demand without pre-buffering). – Evgeniy Berezovsky Jun 09 '15 at 01:54
  • @EugeneBeresovsky I see. I guess I'm just projecting my own experience on to this. I've never had with datasets large enough to impose memory constraints, but have frequently had to deal with throughput issues. – Asad Saeeduddin Jun 09 '15 at 01:55
  • Fair enough. I appreciate your help, although I regret the lack of imagination displayed my some SO users that reflexively shoot a "why would you need this?" comment when reading about a use-case they haven't encountered themselves. In this case my question was about an actual problem, but personally I don't even mind if someone tries to figure out a practical solution to a purely academical problem on SO. – Evgeniy Berezovsky Jun 09 '15 at 01:57
1

There are different ways to implement this yourself, and I decided to use

  • a single dedicated thread per enumerator that does the asynchronous pre-buffering
  • a fixed number of elements to pre-buffer

Which is perfect for my case at hand (only a few, very long-running enumerators), but e.g. creating a thread might be too heavy if you use lots and lots of enumerators, and the fixed number of elements may be too inflexible if you need something more dynamic, based perhaps on the actual content of the items.

I have so far only tested its main feature, and some rough edges may remain. It can be used like this:

int bufferSize = 5;
IEnumerable<int> en = ...;
foreach (var item in new PreBufferingEnumerable<int>(en, bufferSize))
{
    ...

Here's the gist of the Enumerator:

class PreBufferingEnumerator<TItem> : IEnumerator<TItem>
{
    private readonly IEnumerator<TItem> _underlying;
    private readonly int _bufferSize;
    private readonly Queue<TItem> _buffer;
    private bool _done;
    private bool _disposed;

    public PreBufferingEnumerator(IEnumerator<TItem> underlying, int bufferSize)
    {
        _underlying = underlying;
        _bufferSize = bufferSize;
        _buffer = new Queue<TItem>();
        Thread preBufferingThread = new Thread(PreBufferer) { Name = "PreBufferingEnumerator.PreBufferer", IsBackground = true };
        preBufferingThread.Start();
    }

    private void PreBufferer()
    {
        while (true)
        {
            lock (_buffer)
            {
                while (_buffer.Count == _bufferSize && !_disposed)
                    Monitor.Wait(_buffer);
                if (_disposed)
                    return;
            }
            if (!_underlying.MoveNext())
            {
                lock (_buffer)
                    _done = true;
                return;
            }
            var current = _underlying.Current; // do outside lock, in case underlying enumerator does something inside get_Current()
            lock (_buffer)
            {
                _buffer.Enqueue(current);
                Monitor.Pulse(_buffer);
            }
        }
    }

    public bool MoveNext()
    {
        lock (_buffer)
        {
            while (_buffer.Count == 0 && !_done && !_disposed)
                Monitor.Wait(_buffer);
            if (_buffer.Count > 0)
            {
                Current = _buffer.Dequeue();
                Monitor.Pulse(_buffer); // so PreBufferer thread can fetch more
                return true;
            }
            return false; // _done || _disposed
        }
    }

    public TItem Current { get; private set; }

    public void Dispose()
    {
        lock (_buffer)
        {
            if (_disposed)
                return;
            _disposed = true;
            _buffer.Clear();
            Current = default(TItem);
            Monitor.PulseAll(_buffer);
        }
    }
Evgeniy Berezovsky
  • 18,571
  • 13
  • 82
  • 156