new poster here so I hope this makes sense ...
I need to create a collection that I can remove items from in sequence (basically stock market time series data). The data producer is multi-threaded and doesn't guarantee that the data will come in sequence.
I've looked all around for a solution but the only thing I can come up with is to create my own custom dictionary, using ConcurrentDictionary and implementing the IProducerConsumer interface so it can be used with with BlockingCollection.
The code I have below does work, but produces an error
System.InvalidOperationException: The underlying collection was modified from outside of the BlockingCollection
when using the GetConsumingEnumerable() for loop, and the next key in the sequence is not present in the dictionary. In this instance I would like to wait for a specified amount of time and then attempt to take the item from the queue again.
My questions is:
- What's the best way to handle the error when there is no key present. At the moment it seems handling the error would require exiting the loop. Perhaps using GetConsumingEnumerable() is not the right way to consume and a while loop would work better?
Code is below - any help/ideas much appreciated.
IProducerConsumer implementation:
public abstract class BlockingDictionary<TKey, TValue> : IProducerConsumerCollection<KeyValuePair<TKey, TValue>> where TKey : notnull
{
protected ConcurrentDictionary<TKey, TValue> _dictionary = new ConcurrentDictionary<TKey, TValue>();
int ICollection.Count => _dictionary.Count;
bool ICollection.IsSynchronized => false;
object ICollection.SyncRoot => throw new NotSupportedException();
public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
{
if (array == null)
{
throw new ArgumentNullException("array");
}
_dictionary.ToList().CopyTo(array, index);
}
void ICollection.CopyTo(Array array, int index)
{
if (array == null)
{
throw new ArgumentNullException("array");
}
((ICollection)_dictionary.ToList()).CopyTo(array, index);
}
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
return ((IEnumerable<KeyValuePair<TKey, TValue>>)_dictionary).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable<KeyValuePair<TKey, TValue>>)this).GetEnumerator();
}
public KeyValuePair<TKey, TValue>[] ToArray()
{
return _dictionary.ToList().ToArray();
}
bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item)
{
return _dictionary.TryAdd(item.Key, item.Value);
}
public virtual bool TryTake(out KeyValuePair<TKey, TValue> item)
{
item = this.FirstOrDefault();
TValue? value;
return _dictionary.TryRemove(item.Key, out value);
}
}
Time Sequence queue implementation (inherits above)
public class TimeSequenceQueue<T> : BlockingDictionary<DateTime, T>
{
private DateTime _previousTime;
private DateTime _nextTime;
private readonly int _intervalSeconds;
public TimeSequenceQueue(DateTime startTime, int intervalSeconds)
{
_intervalSeconds = intervalSeconds;
_previousTime = startTime;
_nextTime = startTime;
}
public override bool TryTake([MaybeNullWhen(false)] out KeyValuePair<DateTime, T> item)
{
item = _dictionary.SingleOrDefault(x => x.Key == _nextTime);
T? value = default(T);
if (item.Value == null)
return false;
bool result = _dictionary.TryRemove(item.Key, out value);
if (result)
{
_previousTime = _nextTime;
_nextTime = _nextTime.AddSeconds(_intervalSeconds);
}
return result;
}
}
Usage:
BlockingCollection<KeyValuePair<DateTime, object>> _queue = new BlockingCollection<KeyValuePair<DateTime, object>>(new TimeSequenceQueue<object>());
Consuming loop - started in new thread:
foreach (var item in _queue.GetConsumingEnumerable())
{
// feed downstream
}