0

I'm using a ConcurrentQueue<Result> to store results of a certain activity that occurs in my system (ASP.NET Core 6). It works well.

But now I have a new requirement: I need a Result.Id property - a long value, similar to a sequential database primary key. When I enqueue an item, it must get the next value in the sequence, i.e. queue.Count + 1.

When I enqueue items, I don't use locks because ConcurrentQueue does that for me. But now that need Id, I must:

  • lock access to Enqueue()
  • determine the queue's size
  • increment by one
  • set the item's Id
  • enqueue the item

or:

  • lock access to Enqueue()
  • enqueue the item
  • determine the queue's new size
  • update the item's Id (but it will be in an inconsistent state until then)

Can I do this without locking the Enqueue() method? Or maybe there's a better way? (Though I prefer to continue using ConcurrentQueue, if possible, as it already works well for what I have.)

lonix
  • 14,255
  • 23
  • 85
  • 176
  • What will you do to the queue next? You can set the Id when the item is dequeued. – shingo Jun 29 '22 at 04:01
  • Maybe you don't need a queue, just an array and an int indicates the last id. Everytime you want to insert a result, use interlocked method to increate the id and set the result to the position. – shingo Jun 29 '22 at 04:09
  • @shingo Thanks. If I was starting from zero, I would do it your way. The problem is the queue is needed for other reasons too - so if possible, I need to keep it. But thank you for giving me another option to think about. :) – lonix Jun 29 '22 at 04:15
  • I'm a little bit confused about _fixed-sized_ queue in contrast of enqueueing items. <- doesn't matter. The id is a long value. What about using `DateTime.UtcNow.Ticks` as `id`? That's a `long` value, always increasing and can be initialized without knowing the current size of the queue. There might be a race condition that creates the same id if concurrent threads acquire the exact same timestamp. It's just an idea... – Sebastian Schumann Jun 29 '22 at 04:29
  • @SebastianSchumann Actually my concern with ticks was different now that I think of it. The problem is you could get two threads trying to enqueue, and the one with the newer DateTime.Ticks enters the lock first, so is enqueued first - so the items will be in the wrong order. Not bad for my needs - still better than a lock - but maybe there's a cleaner way, I hope. – lonix Jun 29 '22 at 04:37
  • Oh yeah - you're right. The race condition of enqueueing items in a wrong order is still there. Hm. We're using non-blocking exchange of immutable objects by creating complete new instances. That doesn't require any `Concurrent` because all items are immutable and exchanged using `Interlocked.CompareExchange`. You can easily create an static helper method that takes a factory an hides the non-blocking stuff. The problem is that this requires a rewrite of your current usages - and I read out of your comments that this is not an option. – Sebastian Schumann Jun 29 '22 at 04:58
  • 1
    I suggest to reconsider your implemention, if you read [ConcurrentQueue.Count](https://source.dot.net/#System.Private.CoreLib/ConcurrentQueue.cs,18bcbcbdddbcfdcb) you may feel suprise in its abnormally complexity, so normal queue + lock may be even better. – shingo Jun 29 '22 at 05:14
  • What operations are you performing on the `ConcurrentQueue`? Only `Enqueue` and enumeration? No `Dequeue`? – Theodor Zoulias Jun 29 '22 at 05:29
  • @TheodorZoulias Yes it has dequeue as well. It's a fixed size queue, so before enqueuing I ensure to dequeue to correct size (there is a race condition there, but I prefer it to using a lock as the side effects don't matter to me). – lonix Jun 29 '22 at 08:31
  • @shingo You were not joking, Count is really complicated! I think I will reconsider my design to just avoid this problem altogether, thank you. – lonix Jun 29 '22 at 08:33
  • Hmm, I don't get it. Each time you `Dequeue`, the `Count` decreases by one. So if you interleave enqueues and dequeues, and use the `Count + 1` after each enqueue as ID, you'll get duplicate IDs. The queue can easily contain two items with the same ID. Isn't this a problem? – Theodor Zoulias Jun 29 '22 at 08:50
  • 1
    @TheodorZoulias It is a problem, and there are other race conditions and side effects too. I was convinced by the comments above and the answer below to redesign. Probably using a simpler collection and do locking myself. – lonix Jun 29 '22 at 09:20
  • If you end up using a simple `lock`-protected `Queue`. By careful with the enumeration. It's the tricky part. `Enqueue` and `Dequeue` and unique ID assignment will be easy and efficient, but for the enumeration you might have to copy the whole thing into an array before each `foreach`. – Theodor Zoulias Jun 29 '22 at 09:31
  • @TheodorZoulias Thank you for the tip of queue+lock that is what I'll do. But I'm unsure of the enumeration problem, is it an accuracy or a deadlock problem? – lonix Jun 29 '22 at 09:40
  • 1
    If you get an `IEnumerator` and start enumerating it while the `Queue` is mutated in parallel by other threads, the behavior is undefined. You might get random exceptions, corrupted data, whatever. So protecting with `lock` the `GetEnumerator` method alone is not enough. – Theodor Zoulias Jun 29 '22 at 10:10
  • 1
    @TheodorZoulias Your comment resulted in lots of research and yak shaving (thanks / nothanks :-)) and I realised it was an even bigger problem than the one here. So I moved it to a [different](https://stackoverflow.com/questions/72800733/thread-safe-fixed-size-circular-buffer-with-sequence-ids) question. Thanks for your insights. – lonix Jun 29 '22 at 11:36

2 Answers2

1

Instead of assigning the Id when adding the items, you can assign it after you have finished adding items to the queue. You can use a Select with index, e.g.:

public class Item {
    public int Id { get; set; }
    public int Value { get; set; }
}

public static void Main()
{
    var q = new ConcurrentQueue<Item>();
    Parallel.For(0, 10, i => q.Enqueue(new Item() { Value = i + 100 }));
    var withIds = q.Select((x, i) => {
        x.Id = i;
        return x;
    });
    
    Console.WriteLine(string.Join(", ", withIds.Select(x => string.Format("{0} -> {1}", x.Id, x.Value))));
}

This leads to the following output:

0 -> 100, 1 -> 102, 2 -> 103, 3 -> 105, 4 -> 101, 5 -> 106, 6 -> 107, 7 -> 104, 8 -> 108, 9 -> 109

Depending on the size of the queue and the number of times you iterate it, you can also add a ToArray after the Select in order to avoid assigning the Ids multiple times.

Important to note that this only works if you can clearly separate the phase of adding items to the queue and processing the queue.

Markus
  • 20,838
  • 4
  • 31
  • 55
  • Thank you Markus that is a very clever approach! I am still trying to understand the tricky bits. Am I right in assuming 1) this enumerates queue on every enqueue, 2) between the enqueue and Select, last item is in inconsistent state (that is what you meant by "clearly separate the phase...")? Otherwise despite these issues it's stil a nice way to avoid a lock, in my case, as these side effects are not too bad for my use case. – lonix Jun 29 '22 at 08:38
  • @lonix no, the queue is enumerated only when the `Select` is executed ("Linq deferred execution"). If you only want to enumerate it once, add a `.ToArray()` after the `Select` but keep in mind that this means more memory consumption. By "separate the phases" I mean that if you enqueue items, enumerate then and enqueue or dequeue again, you might get different ids because the id is only assigned in the `Select`. In the sample, the phases are separated - first the items are enqueued in the `Parallel.For`. When this is finished, the ids are assigned. After that, no items are de- or enqueued. – Markus Jun 29 '22 at 09:08
  • Ah I see, I need to enqueue items one by one and ensure the ids are correct at any given point in time, so the enumeration must be performed each time. I think there are drawbacks or race conditions no matter which solution I choose, maybe a ConcurrentQueue is the wrong data structure for my use case. But your solution does answer the question, thank you! – lonix Jun 29 '22 at 09:17
1

You could consider storing the IDs in Lazy<long> instances, and make these instances a part of the state of your Result type. Or you could store them side by side in the ConcurrentQueue<T> in value tuples, like this:

long idSeed = 0;
Func<long> idFactory = () => Interlocked.Increment(ref idSeed);

ConcurrentQueue<(Result, Lazy<long>)> queue = new();

//...

queue.Enqueue((new Result(), new Lazy<long>(idFactory)));

//...

if (queue.TryDequeue(out var entry))
{
    var (result, lazy) = entry;
    long id = lazy.Value;
}

The Lazy<T> class guarantees that the valueFactory will be invoked only once for each Lazy<T> instance. So even if multiple threads are racing to read the ID of a specific Result, all threads will get the same ID.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Thanks that is another good and very clever approach! – lonix Jun 29 '22 at 11:38
  • @lonix you could also take a look at the [source code](https://github.com/dotnet/runtime/blob/cd553b7208db0df889d8fbb325de65182443bb31/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs#L1194-L1206) of the `Task` class, and see how Microsoft has implemented the [`Id`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task.id) property. – Theodor Zoulias Jun 29 '22 at 20:15