41

Quite simple: Other than ConcurrentDictionary (which I'll use if I have to but it's not really the correct concept), is there any Concurrent collection (IProducerConsumer implementation) that supports removal of specific items based on simple equality of an item or a predicate defining a condition for removal?

Explanation: I have a multi-threaded, multi-stage workflow algorithm, which pulls objects from the DB and sticks them in a "starting" queue. From there they are grabbed by the next stage, further worked on, and stuffed into other queues. This process continues through a few more stages. Meanwhile, the first stage is invoked again by its supervisor and pulls objects out of the DB, and those can include objects still in process (because they haven't finished being processed and so haven't been re-persisted with the flag set saying they're done).

The solution I am designing is a master "in work" collection; objects go in that queue when they are retrieved for processing by the first stage, and are removed after they have been re-saved to the DB as "processed" by whatever stage of the workflow completed the necessary processing. While the object is in that list, it will be ignored if it is re-retrieved by the first stage.

I had planned to use a ConcurrentBag, but the only removal method (TryTake) removes an arbitrary item from the bag, not a specified one (and ConcurrentBag is slow in .NET 4). ConcurrentQueue and ConcurrentStack also do not allow removal of an item other than the next one it'll give you, leaving ConcurrentDictionary, which would work but is more than I need (all I really need is to store the Id of the records being processed; they don't change during the workflow).

KeithS
  • 70,210
  • 21
  • 112
  • 164
  • how do you feel about using ReaderWriterLockSlim and a List? or perhaps rolling your own concurrent collection – Frobzig Jul 27 '12 at 21:12
  • 1
    @Frobzig - Ambivalent to mildly interested. I like the Concurrent collections because they just work; very little code involved. – KeithS Jul 27 '12 at 21:25
  • Something like Kafka is good for processing queues rather than trying to write your own – Mark Homer Jan 28 '23 at 15:10

3 Answers3

24

The reason why there is no such a data structure is that all collections have lookup operation time of O(n). These are IndexOf, Remove(element) etc. They all enumerate through all elements and checking them for equality.

Only hash tables have lookup time of O(1). In concurrent scenario O(n) lookup time would lead to very long lock of a collection. Other threads will not be able to add elements during this time.

In dictionary only the cell hit by hash will be locked. Other threads can continue adding while one is checking for equality through elements in hash cell.

My advice is go on and use ConcurrentDictionary.


By the way, you are right that ConcurrentDictionary is a bit oversized for your solution. What you really need is to check quickly weather an object is in work or not. A HashSet would be a perfect for that. It does basically nothing then Add(element), Contains(element), Remove(element). There is a ConcurrentHeshSet implementation in java. For c# I found this: How to implement ConcurrentHashSet in .Net don't know how good is it.

As a first step I would still write a wrapper with HashSet interface around ConcurrentDictionary bring it up and running and then try different implementations and see performance differences.

Community
  • 1
  • 1
George Mamaladze
  • 7,593
  • 2
  • 36
  • 52
6

As already explained by it other posts its not possible to remove items from a Queue or ConcurrentQueue by default, but actually the easiest way to get around is to extend or wrap the item.

public class QueueItem
{
    public Boolean IsRemoved { get; private set; }
    public void Remove() { IsRemoved = true; }
}

And when dequeuing:

QueueItem item = _Queue.Dequeue(); // Or TryDequeue if you use a concurrent dictionary
if (!item.IsRemoved)
{
    // Do work here
}
Felix K.
  • 6,201
  • 2
  • 38
  • 71
  • 1
    This solution cause memory leak. – Kirill Bestemyanov Sep 10 '15 at 16:45
  • @KirillBestemyanov No, how should it cause a memory leak? – Felix K. Sep 11 '15 at 16:08
  • 1
    You don't delete task from collection and so memory consumed by this collection only increased due it's running. – Kirill Bestemyanov Sep 12 '15 at 14:44
  • @KirillBestemyanov If you want to remove the items when they got removed you should not use a Queue, instead use something else. By definition it's not a memory leak, it's a missuse of the Queue. – Felix K. Sep 12 '15 at 14:59
  • 1
    And why do you recommend this solution to questioner? If this is missuse and application will slowly eat memory during execution? – Kirill Bestemyanov Sep 14 '15 at 07:32
  • @KirillBestemyanov Actually the asker of the question asked also about `ConcurrentQueue`, that's why it's here. You can also have memory leaks when missusing a `ConcurrentDictionary` so what is the actual purpose of your comment? This solution makes indeed sense in some situations where you anyway frequently removing elements from the queue and the removed ones are discarded. Removing the items from a queue in a different way is very time consuming even if you are implementing your own queue. – Felix K. Sep 14 '15 at 10:21
  • The point of my comment is to show for readers the weaknesses of this solution. – Kirill Bestemyanov Sep 14 '15 at 11:17
  • @KirillBestemyanov And this is actually wrong in my opinion, because you usually use a Queue when doing regular dequeuing, otherwise a LinkedList ist much better for this purpose. – Felix K. Sep 14 '15 at 17:33
  • This approach is known as a **tombstone**. It refers to the boolean that marks the item as deleted. – Timo Jan 24 '22 at 12:48
  • @Timo Good to know, thank you. – Felix K. Jan 28 '22 at 08:21
3

It's really hard to make a collection thread-safe in the generic sense. There are so many factors that go into thread-safety that are outside the responsibility or purview of a library/framework class that affect the ability for it to be truly "thread-safe"... One of the drawbacks as you've pointed out is the performance. It's impossible to write a performant collection that is also thread-safe because it has to assume the worst...

The generally recommended practice is to use whatever collection you want and access it in a thread-safe way. This is basically why there aren't more thread-safe collections in the framework. More on this can be found at http://blogs.msdn.com/b/bclteam/archive/2005/03/15/396399.aspx#9534371

Peter Ritchie
  • 35,463
  • 9
  • 80
  • 98
  • The difference between a thread safe access of a collection and accessing a thread safe collection can be quite large. If the number of threads is large and the operations they are conducting are non-trivial, then there could be a huge bottleneck on each and every thread waiting for it's own lock to acquire and clear. I suppose you could maintain an arbitrarily large collection of lock objects (corresponding to e.g. each hash), but that would be pretty messy... – Patrick M Jul 27 '12 at 22:18
  • @PatrickM The set of operations to make a particular applications access to a collection thread-safe is much lower than making a collection thread-safe in every scenario is can be used. You should only need one lock object to make access to any given object (collection or not) thread-safe. All the Concurrent* collections only use one lock object. – Peter Ritchie Jul 28 '12 at 00:11
  • In many cases, it's not hard to design fast thread-safe collections that implement certain subsets of the normal interfaces (e.g. an `IList` which supported `Add` and write-by-index as the only mutation methods). Such things may be more efficient than collections which provide more generalized access, but are rare in the Framework. The only example that comes to mind is `ConditionalWeakTable`, which is like a dictionary, but doesn't allow items to be removed. – supercat Jan 06 '14 at 22:00