0

new poster here so I hope this makes sense ...

I need to create a collection that I can remove items from in sequence (basically stock market time series data). The data producer is multi-threaded and doesn't guarantee that the data will come in sequence.

I've looked all around for a solution but the only thing I can come up with is to create my own custom dictionary, using ConcurrentDictionary and implementing the IProducerConsumer interface so it can be used with with BlockingCollection.

The code I have below does work, but produces an error

System.InvalidOperationException: The underlying collection was modified from outside of the BlockingCollection

when using the GetConsumingEnumerable() for loop, and the next key in the sequence is not present in the dictionary. In this instance I would like to wait for a specified amount of time and then attempt to take the item from the queue again.

My questions is:

  • What's the best way to handle the error when there is no key present. At the moment it seems handling the error would require exiting the loop. Perhaps using GetConsumingEnumerable() is not the right way to consume and a while loop would work better?

Code is below - any help/ideas much appreciated.

IProducerConsumer implementation:

public abstract class BlockingDictionary<TKey, TValue> : IProducerConsumerCollection<KeyValuePair<TKey, TValue>> where TKey : notnull
{
    protected ConcurrentDictionary<TKey, TValue> _dictionary = new ConcurrentDictionary<TKey, TValue>();

    int ICollection.Count => _dictionary.Count;

    bool ICollection.IsSynchronized => false;

    object ICollection.SyncRoot => throw new NotSupportedException();

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
    {
        if (array == null)
        {
            throw new ArgumentNullException("array");
        }
        _dictionary.ToList().CopyTo(array, index);
    }

    void ICollection.CopyTo(Array array, int index)
    {

        if (array == null)
        {
            throw new ArgumentNullException("array");
        }
        ((ICollection)_dictionary.ToList()).CopyTo(array, index);
    }

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
        return ((IEnumerable<KeyValuePair<TKey, TValue>>)_dictionary).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<KeyValuePair<TKey, TValue>>)this).GetEnumerator();
    }

    public KeyValuePair<TKey, TValue>[] ToArray()
    {
        return _dictionary.ToList().ToArray();
    }

    bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item)
    {
        return _dictionary.TryAdd(item.Key, item.Value);
    }

    public virtual bool TryTake(out KeyValuePair<TKey, TValue> item)
    {
        item = this.FirstOrDefault();
        TValue? value;

        return _dictionary.TryRemove(item.Key, out value);
    }
}

Time Sequence queue implementation (inherits above)

public class TimeSequenceQueue<T> : BlockingDictionary<DateTime, T>
{
    private DateTime _previousTime;
    private DateTime _nextTime;
    private readonly int _intervalSeconds;

    public TimeSequenceQueue(DateTime startTime, int intervalSeconds)
    {
        _intervalSeconds = intervalSeconds;
        _previousTime = startTime;
        _nextTime = startTime;
    }

    public override bool TryTake([MaybeNullWhen(false)] out KeyValuePair<DateTime, T> item)
    {
        item = _dictionary.SingleOrDefault(x => x.Key == _nextTime);
        T? value = default(T);

        if (item.Value == null)
            return false;

        bool result = _dictionary.TryRemove(item.Key, out value);

        if (result)
        {
            _previousTime = _nextTime;
            _nextTime = _nextTime.AddSeconds(_intervalSeconds);
        }

        return result;
    }
}

Usage:

BlockingCollection<KeyValuePair<DateTime, object>> _queue = new BlockingCollection<KeyValuePair<DateTime, object>>(new TimeSequenceQueue<object>());

Consuming loop - started in new thread:

foreach (var item in _queue.GetConsumingEnumerable())
{
    // feed downstream
}
jonl
  • 13
  • 2
  • Somewhat related: [How to wrap ConcurrentDictionary in BlockingCollection?](https://stackoverflow.com/questions/10736209/how-to-wrap-concurrentdictionary-in-blockingcollection) – Theodor Zoulias May 11 '22 at 00:50
  • Hi Theodor, thanks for the comments. I did see the other post you linked, which is what help lead me to the code I currently have. I've tried to narrow the question down (edited above) so it isn't so broad as to need all the info you suggested. I guess the main thing I would be interested in is how to handle the error in the GetConsumingEnum loop without having to exit the loop – jonl May 11 '22 at 10:53
  • @jlangdonbtinternetcom the async queue type is [Channel](https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/). It's built precisely for producer/consumer scenarios and pipelines, which is why producers and consumers use different APIs, ChannelWriter and ChannelReader. You can eliminate all of the code you posted. ChannelReaders can provide incoming messages through an IAsyncEnumerable which means asynchronously processing incoming messages is as simple as `await foreach(var msg in reader.ReadAllAsync()){...}` – Panagiotis Kanavos May 11 '22 at 10:55
  • Regarding the high-performance asynchronous [`Channel`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channel-1) class that was suggested in a previous comment, be aware of this issue: [Channels with CancellationTokenSource with timeout memory leak after dispose](https://stackoverflow.com/questions/67573683/channels-with-cancellationtokensource-with-timeout-memory-leak-after-dispose). It's quite likely that in order to implement the functionality that you want, you would settle in the usage pattern that allows this memory leak to appear. – Theodor Zoulias May 11 '22 at 15:55
  • Thanks for the responses. @PanagiotisKanavos, thanks for the pointer to channels. I've been looking through the docs and playing around - looks really interesting and definitely could be used for what I need. It feels like a vast topic though (especially as I believe I'd need to create my own channel/reader and writer for my purpose), and not sure I'm confident I could implement something robust (as in, be sure I understood what was under the hood) in a reasonable time-frame. Will definitely come back to it at some point though. – jonl May 12 '22 at 14:46

1 Answers1

0

When using the GetConsumingEnumerable() for loop, and the next key in the sequence is not present in the dictionary [...] I would like to wait for a specified amount of time and then attempt to take the item from the queue again.

I will try to answer this question generally, without paying too much attention to the specifics of your problem. So let's say that you are consuming a BlockingCollection<T> like this:

foreach (var item in collection.GetConsumingEnumerable())
{
    // Do something with the consumed item.
}

...and you want to avoid waiting indefinitely for an item to arrive. You want to wake up every 5 seconds and do something, before waiting/sleeping again. Here is how you could do it:

while (!collection.IsCompleted)
{
    bool consumed = collection.TryTake(out var item, TimeSpan.FromSeconds(5));
    if (consumed)
    {
        // Do something with the consumed item.
    }
    else
    {
        // Do something before trying again to take an item.
    }
}

The above pattern imitates the actual source code of the BlockingCollection<T>.GetConsumingEnumerable method.

If you want to get fancy you could incorporate this functionality in a custom extension method for the BlockingCollection<T> class, like this:

public static IEnumerable<(bool Consumed, T Item)> GetConsumingEnumerable<T>(
    this BlockingCollection<T> source, TimeSpan timeout)
{
    while (!source.IsCompleted)
    {
        bool consumed = source.TryTake(out var item, timeout);
        yield return (consumed, item);
    }
}

Usage example:

foreach (var (consumed, item) in collection.GetConsumingEnumerable(
    TimeSpan.FromSeconds(5)))
{
    // Do something depending on whether an item was consumed or not.
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Thanks for this answer @TheodorZoulias. I believe I am able to use this to progress with what I need at the moment so was very helpful – jonl May 12 '22 at 14:49
  • @jonl if you want ideas about how to implement the `BlockingCollection`-like structure that reorders the incoming messages, you could take a look at this question: [How to restore the order of a shuffled Dataflow pipeline?](https://stackoverflow.com/questions/65358480/how-to-restore-the-order-of-a-shuffled-dataflow-pipeline) Feel free to come back with a new question of you get stuck! – Theodor Zoulias May 12 '22 at 20:14
  • Thanks for the link @TheodorZoulias. Will check that out – jonl May 16 '22 at 18:32