2

I was asking for any ConcurentSet/ConcurentQueue implementation here how to create no-duplicates ConcurrentQueue? but noone suggested anything.

So I've decided to wrote it myself, modified MSDN example slightly. I've created ConcurentSet class. I insert the same element twice, and expect it to be inserted only once, because Set is used inside:

                numbers.Add(i);
                Console.WriteLine("Add:{0} Count={1}", i, numbers.Count);
                numbers.Add(i);
                Console.WriteLine("Add:{0} Count={1}", i, numbers.Count);

However according to output elements inserted twice:

    Add:0 Count=0
    Add:0 Count=1
    Add:1 Count=2
    Add:1 Count=3
    Add:2 Count=4
    Take:0
    Add:2 Count=5
    Add:3 Count=6
    Add:3 Count=7
    Add:4 Count=7
    Add:4 Count=8
    Take:3
    Add:5 Count=9
    Add:5 Count=10
    Add:6 Count=11
    Add:6 Count=12

The question is - why my ConcurentSet implementation doesn't work as expected? I expect such output:

    Add:0 Count=0
    Add:0 Count=0
    Add:1 Count=1
    Add:1 Count=1
    Add:2 Count=2
    Take:0
    Add:2 Count=1
    Add:3 Count=2
    Add:3 Count=2
    Add:4 Count=3
    Add:4 Count=3
    Take:3
    Add:5 Count=3
    Add:5 Count=3
    .....

Full listing below:

using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace BCBlockingAccess
{
using System;
using System.Collections.Concurrent;

public class ConcurentSet
: IProducerConsumerCollection<int>
{
    private readonly object m_lockObject = new object();
    private readonly HashSet<int> m_set = new HashSet<int>();

    public void CopyTo(Array array, int index)
    {
        throw new NotImplementedException();
    }

    public int Count { get { return m_set.Count; } }

    public object SyncRoot { get { return m_lockObject; } }

    public bool IsSynchronized { get { return true; } }

    public void CopyTo(int[] array, int index)
    {
        throw new NotImplementedException();
    }

    public bool TryAdd(int item)
    {
        lock (m_lockObject)
        {
            m_set.Add(item);
        }

        return true;
    }

    public bool TryTake(out int item)
    {
        lock (m_lockObject)
        {
            foreach (var i in m_set)
            {
                if (m_set.Remove(i))
                {
                    item = i;
                    return true;
                }
            }
            item = -1;
            return false;
        }
    }

    public int[] ToArray()
    {
        throw new NotImplementedException();
    }

    public IEnumerator<int> GetEnumerator()
    {
        throw new NotImplementedException();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}


class Program
{
    static void Main(string[] args)
    {
        // Increase or decrease this value as desired.
        int itemsToAdd = 50;

        // Preserve all the display output for Adds and Takes
        Console.SetBufferSize(80, (itemsToAdd * 5) + 3);

        // A bounded collection. Increase, decrease, or remove the 
        // maximum capacity argument to see how it impacts behavior.
        BlockingCollection<int> numbers = new BlockingCollection<int>(new ConcurentSet());


        // A simple blocking consumer with no cancellation.
        Task.Factory.StartNew(() =>
        {
            int i = -1;
            while (!numbers.IsCompleted)
            {
                try
                {
                    i = numbers.Take();
                }
                catch (InvalidOperationException)
                {
                    Console.WriteLine("Adding was compeleted!");
                    break;
                }
                Console.WriteLine("Take:{0} ", i);

                // Simulate a slow consumer. This will cause
                // collection to fill up fast and thus Adds wil block.
                Thread.SpinWait(100000);
            }

            Console.WriteLine("\r\nNo more items to take. Press the Enter key to exit.");
        });

        // A simple blocking producer with no cancellation.
        Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < itemsToAdd; i++)
            {
                numbers.Add(i);
                Console.WriteLine("Add:{0} Count={1}", i, numbers.Count);
                numbers.Add(i);
                Console.WriteLine("Add:{0} Count={1}", i, numbers.Count);
            }

            // See documentation for this method.
            numbers.CompleteAdding();
        });

        // Keep the console display open in debug mode.

        Console.ReadLine();
    }
}
}
Community
  • 1
  • 1
Oleg Vazhnev
  • 23,239
  • 54
  • 171
  • 305
  • Eiter something is terribly broken or your testcode and output just don't match: Count == 0 after the first Add() ? – H H May 01 '11 at 11:56
  • @Henk, he's using `BlockingCollection`, to which he gives this collection as a constructor argument. So when he calls `Add()`, the `BlockingCollection` calls `TryAdd()` of his class. – svick May 01 '11 at 14:23
  • @svick, OK, thanks, I missed that. – H H May 01 '11 at 15:23

1 Answers1

3

BlockingCollection maintains its own count and doesn't use count of underlying ConcurrentSet of yours therefore even if duplicates are ignored, the count increases.

You might want to write your own wrapper around this collection which returns count from the concurrentset and relays every other method property to the blocking collection.

Muhammad Hasan Khan
  • 34,648
  • 16
  • 88
  • 131