1

Suppose I have a Queue of Tasks, and each Task have a locking object (syncObject) which controls shared resource access, Queue can have multiple Task which share same instances of syncObject. And I have N concurrent threads that should dequeue Tasks and proccess them in queue order, this means acquire lock on syncObject in the order of queue.

Code explanation:

abstract class Task
{
    public readonly Object SyncObject = new Object();
}

Queue<Task> taskQueue = new Queue<Task>();
Object queueLock = new Object();

void TakeItemThreadedMethod()
{
    Task task;
    lock(queueLock) task = taskQueue.Dequeue();
    //Between this lines is my problem,
    //Other thread can dequeue next task and it may share same syncObject and
    //acquire lock on it before this thread, thought this task was first in queue
    lock(task.SyncObject)
    {
        //Do some work here
    }
}

How to start proccessing Tasks (acquire Task.SyncObject lock) that share the same SyncObject in the order they were in Queue.

Alex Burtsev
  • 12,418
  • 8
  • 60
  • 87
  • I must be misunderstanding something, but if you have to process the tasks in order, then using many threads won't help as only 1 thread will run while the rest block. – ScottS Nov 01 '12 at 07:03
  • 1
    Instead of having "Tasks sharing same instances of syncObject", you can use different queues for them. – L.B Nov 01 '12 at 07:13
  • @L.B: Glad to hear my idea wasn't a crazy one :) I must have been writing it up when you added that comment... – Jon Skeet Nov 01 '12 at 07:21

3 Answers3

2

It sounds like potentially your queue shouldn't contain individual tasks - but queues of tasks, where each subqueue is "all the tasks which share a sync-lock".

Your processor would therefore:

  • Take a subqueue off the main queue
  • Dequeue the first task off the subqueue and process it
  • When it's finished put the subqueue back at the end of the main queue (or anywhere, actually - work out how you want the scheduling to work)

This will ensure that only one task per subqueue is ever executed at a time.

You'll probably need a map from lock to subqueue, so that anything creating work can add it to the right subqueue. You'd need to atomically work out when to remove a subqueue from the map (and not put it back on the main queue), assuming you require that functionality at all.

EDIT: As an optimization for the above, you could put the subqueue itself into whatever you're using as the shared sync lock. It could have a reference to either "the single task to next execute" or "a queue of tasks" - only creating the queue lazily. You'd then put the sync lock (which wouldn't actually need to be used as a lock any more) on the queue, and each consumer would just ask it for the next task to execute. If only a single task is available, it's returned (and the "next task" variable set to null). If there are multiple tasks available, the first is dequeued.

When a producer adds a new task, either the "first task" variable is set to the task to execute if it was previously null, or a queue is created if there wasn't a queue but was already a task, or the queue is just added to if one already existed. That solves the inefficiency of unnecessary queue creation.

Again, the tricky part will be working out how to atomically throw away the shared resource lock - because you only want to do so after processing the last item, but equally you don't want to miss a task because you happened to add it at the wrong time. It shouldn't be too bad, but equally you'll need to think about it carefully.

Jon Skeet
  • 1,421,763
  • 867
  • 9,128
  • 9,194
  • This is not suitable, because 1. I want to control number of worker threads, 2. Situation when Tasks share some resource is very rare, I will have tons of queue's with 1 item and tons of threads – Alex Burtsev Nov 01 '12 at 07:17
  • 1
    @AlexBurtsev: I don't see how 1) is an issue - you can still control the number of worker threads pulling subqueues off the main queue. As for 2) - do you have any evidence that the slight inefficiency introduced will *actually* be significant? In general I'd rather have a working-but-inefficient solution than broken code... (Note that if you're going to add restrictions on what constitutes a suitable answer, it would be helpful to state those restrictions in the question.) – Jon Skeet Nov 01 '12 at 07:20
  • @AlexBurtsev: See my edit for an option for how to do this efficiently. – Jon Skeet Nov 01 '12 at 07:27
  • I guess, you are right about issue 1). I agree that this is a possible solution, but the global order of Tasks is important for me, even when Tasks do not share any locks I would like to preserve processing order or make it as close to queue order as possible. – Alex Burtsev Nov 01 '12 at 07:27
  • @AlexBurtsev: Again, this was a requirement you never mentioned in the original post - and one which is basically somewhat at odds with having multiple consumer threads. However, if it's relatively rare for tasks to share the same lock, they *will* be dequeued in order anyway. When you create a task for an existing resource, it could end up executing earlier or later than otherwise. – Jon Skeet Nov 01 '12 at 07:35
2

How about this approach:

  • use a list instead of a queue
  • have each worker thread loop in order through the queue until it can find an "unlocked" task

Something like (untested):

abstract class Task
{
    public readonly Object SyncObject = new Object();
}

List<Task> taskList = new List<Task>();

void TakeItemThreadedMethod()
{
    Task task = null;
    bool found = false;

    try
    {
        // loop until found an task whose SyncObject is free
        while (!found)
        {
            lock (taskList)
            {
                for (int i = 0; i < taskList.Count; i++)
                {
                    object syncObj = taskList[i].SyncObject;
                    if (found = Monitor.TryEnter(syncObj))
                    {
                        for (int x = 0; x < taskList.Count; x++)
                        {
                            if (Object.ReferenceEquals(
                                    syncObj, taskList[x].SyncObject))
                            {
                                task = taskList[x];
                                taskList.RemoveAt(x);
                                break;
                            }
                        }
                        break;
                    }
                }
            }
        }

        // process the task...
        DoWork(task);
    }
    finally
    {
        if (found) Monitor.Exit(task.SyncObject);
    }
}

void QueueTask(Task task)
{
    lock (taskList)
    {
        taskList.Add(task);
    }
}
Eren Ersönmez
  • 38,383
  • 7
  • 71
  • 92
  • Very interesting solution, at first sight, it looks like that it should work, and it is very simple and efficient, I need some time to analyze and test it. Thank you. – Alex Burtsev Nov 01 '12 at 08:43
  • Nice try, but there is a flaw in this algorithm. While your search loop is working locks can be taken and released on tasks by other threads. Suppose you have a task queue [1,2,3,1] Lock1 is taken by some worker, your algorithm ignored it and currently trying Lock2 (suppose 2,3 are taken), at this moment Lock1 is released, and your algorithm will choose last Task with Lock1, and it will be proccessed before first Task with Lock1 – Alex Burtsev Nov 01 '12 at 11:14
  • Uh, nice catch. Though it should be an easy fix. You just need to go back and find the first Task with the same `SyncObject`. I updated the code. – Eren Ersönmez Nov 01 '12 at 12:36
1

I have used QueuedLock class suggested by Matthew Brindley, with slight modification, I have split up Enter function to TakeTicket and Enter which blocks.

Now I can use TakeTicket inside shared QueueLock without blocking whole queue.

Modified code:

abstract class Task
{
    public readonly QueuedLock SyncObject = new QueuedLock();
}

Queue<Task> taskQueue = new Queue<Task>();
Object queueLock = new Object();

void TakeItemThreadedMethod()
{
    Task task;
    int ticket;
    lock(queueLock) 
    {
        task = taskQueue.Dequeue();
        ticket = task.SyncObject.TakeTicket();
    }
    task.SyncObject.Enter(ticket);
    //Do some work here
    task.SyncObject.Exit();
}
Community
  • 1
  • 1
Alex Burtsev
  • 12,418
  • 8
  • 60
  • 87
  • Essentially this is what Jon Skeet suggested, I have Queue of Queue's which I have integrated into LockObject. – Alex Burtsev Nov 01 '12 at 08:29
  • Yes, I think this could work (and it's something I was potentially going to suggest, but didn't want to keep putting effort into a question with changing requirements). You'd want to put `Exit` into a `finally` block though. – Jon Skeet Nov 01 '12 at 09:27