1

I'm trying to take all items in one fell swoop from a ConcurrentBag. Since there's nothing like TryEmpty on the collection, I've resorted to using Interlocked.Exchange in the same fashion as described here: How to remove all Items from ConcurrentBag?

My code looks like this:

private ConcurrentBag<Foo> _allFoos; //Initialized in constructor.

public bool LotsOfThreadsAccessingThisMethod(Foo toInsert)
{
    this._allFoos.Add(toInsert);
    return true;
}

public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
    var token = (CancellationToken) state;
    var workingSet = new List<Foo>();

    while (!token.IsCancellationRequested)
    {
        if (!workingSet.Any())
        {
            workingSet = Interlocked.Exchange(ref this._allFoos, new ConcurrentBag<Foo>).ToList();
        }

        var processingCount = (int)Math.Min(workingSet.Count, TRANSACTION_LIMIT);

        if (processingCount > 0)
        {
            using (var ctx = new MyEntityFrameworkContext())
            {
                ctx.BulkInsert(workingSet.Take(processingCount));
            }
            workingSet.RemoveRange(0, processingCount);
        }
    }
}

The problem is that this sometimes misses items that are added to the list. I've written a test application that feeds data to my ConcurrentBag.Add method and verified that it is sending all of the data. When I set a breakpoint on the Add call and check the count of the ConcurrentBag after, it's zero. The item just isn't being added.

I'm fairly positive that it's because the Interlocked.Exchange call doesn't use the internal locking mechanism of the ConcurrentBag so it's losing data somewhere in the swap, but I have no knowledge of what's actually happening.

How can I just grab all the items out of the ConcurrentBag at one time without resorting to my own locking mechanism? And why does Add ignore the item?

Community
  • 1
  • 1
Brandon
  • 4,491
  • 6
  • 38
  • 59
  • Looks like you're implementing a producer-consumer pattern. Have you considered using *TPL Dataflow*? – Yuval Itzchakov Oct 15 '15 at 16:00
  • 1
    No, I've actually never seen that before. Reading up on it now. – Brandon Oct 15 '15 at 16:12
  • _"without resorting to my own locking mechanism" -- since `ConcurrentBag` doesn't provide such a mechanism in the first place, why do you think it's even possible to do so without implementing your own? What is wrong with just surrounding the code with a `lock` and being done with it? Please note that `InterlockedExchange` only protects the variable when all other accesses use the same; otherwise, you still have the possibility of changing the variable in an unsynchronized way (i.e. some other code getting the old value and using it even while you are trying to pull items from the collection). – Peter Duniho Oct 15 '15 at 16:25
  • And if you are just trying to implement producer-consumer, note that `BlockingCollection` (which you can initialize with `ConcurrentBag`) provides easy-to-use, straightforward producer-consumer semantics. If all you really want is producer-consumer, you can just use that. – Peter Duniho Oct 15 '15 at 16:26
  • Then what's the benefit of `ConcurrentBag` over a list if there's no internal locking mechanism when accessing the elements? – Brandon Oct 15 '15 at 16:39
  • Also, `BlockingCollection` doesn't provide a method to empty the entire collection. Items will never stop adding to the collection in my application so `CompleteAdding()` is not useful for me. I haven't implemented my own locks because the actual method (not the MCVE) has 3 such `ConcurrentBag`s, which means 3 locks... – Brandon Oct 15 '15 at 16:48
  • You loose items because the Interlocked.Exchange will replace the dictionary with a new one but who told you that no still sleeping thread holds a reference to the now swapped out dictionary? Now you copy the items out to a list but another thread can wake up after that and still add elements to your "old" dictionary. That is the way how you loose items. The solution of Ivan looks nice. Try that one. – Alois Kraus Oct 15 '15 at 17:37

1 Answers1

2

I think taking all the items from the ConcurentBag is not needed. You can achieve exactly the same behavior you are trying to implement simply by changing the processing logic as follows (no need for own synchronization or interlocked swaps):

public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
    var token = (CancellationToken)state;
    var buffer = new List<Foo>(TRANSACTION_LIMIT);
    while (!token.IsCancellationRequested)
    {
        Foo item;
        if (!this._allFoos.TryTake(out item))
        {
            if (buffer.Count == 0) continue;
        }
        else
        {
            buffer.Add(item);
            if (buffer.Count < TRANSACTION_LIMIT) continue;
        }
        using (var ctx = new MyEntityFrameworkContext())
        {
            ctx.BulkInsert(buffer);
        }
        buffer.Clear();
    }
}
Ivan Stoev
  • 195,425
  • 15
  • 312
  • 343
  • I like this. I have to make a few modifications though for the case where data stops for some reason.. this will wait indefinitely until the transaction limit is hit... I'll see if I can work with this. Thanks – Brandon Oct 15 '15 at 17:21
  • @Brandon It will not. See the body of the `if`. If there is nothing in the bag, and you have something in the buffer, it would commit. – Ivan Stoev Oct 15 '15 at 17:22
  • Yep... Yep it will... That's my bad. Been a long day. – Brandon Oct 15 '15 at 17:25
  • I made some tweaks to this for my 3 instances of `ConcurrentBag`, works like a charm. Now, this solved my problem but didn't answer the question. In the spirit of SO, do I still mark this as answer?? – Brandon Oct 15 '15 at 18:11
  • @Brandon I'm afraid I cannot answer that question, which seem to be more appropriate for `meta`. But if we consider your original question to be a `XY problem`, I guess you can :-) – Ivan Stoev Oct 15 '15 at 18:14
  • @Brandon Not that I insist, but since you asked, here is an excerpt from the help center: "To accept an answer: Choose one answer that you believe is the **best solution** to **your problem**." – Ivan Stoev Oct 15 '15 at 18:24