7

Is there any prior work of adding tasks to the TPL runtime with a varying priority?

If not, generally speaking, how would I implement this?

Ideally I plan on using the producer-consumer pattern to add "todo" work to the TPL. There may be times where I discover that a low priority job needs to be upgraded to a high priority job (relative to the others).

If anyone has some search keywords I should use when searching for this, please mention them, since I haven't yet found code that will do what I need.

svick
  • 236,525
  • 50
  • 385
  • 514
makerofthings7
  • 60,103
  • 53
  • 215
  • 448
  • Do you want to have just a low/high, do you want to have a small fixed number of priorities, i.e. Highest, high, normal, low, lowest, or do you want to be able to give each task a priority between like 1 and 100, or between 1 and some very large number? The first is quite easy, the second not so much. – Servy Feb 18 '13 at 16:40
  • If you don't find prior work, here's a guide to implementing custom blocks: http://blogs.msdn.com/b/pfxteam/archive/2011/12/05/10244302.aspx – Eric J. Feb 18 '13 at 16:40
  • @EricJ. I don't see how are custom dataflow blocks relevant to the question. – svick Feb 18 '13 at 16:42
  • afaik Dataflow is a subset of TPL that doesn't apply to general TPL usage. Good to know it exists for other work. – makerofthings7 Feb 18 '13 at 16:43
  • Related: [Concurrent collection with priority](https://stackoverflow.com/questions/23470196/concurrent-collection-with-priority) – Theodor Zoulias Apr 14 '22 at 08:52

2 Answers2

4

So here is a rather naive concurrent implementation around a rather naive priority queue. The idea here is that there is a sorted set that holds onto pairs of both the real item and a priority, but is given a comparer that just compares the priority. The constructor takes a function that computes the priority for a given object.

As for actual implementation, they're not efficiently implemented, I just lock around everything. Creating more efficient implementations would prevent the use of SortedSet as a priority queue, and re-implementing one of those that can be effectively accessed concurrently is not going to be that easy.

In order to change the priority of an item you'll need to remove the item from the set and then add it again, and to find it without iterating the whole set you'd need to know the old priority as well as the new priority.

public class ConcurrentPriorityQueue<T> : IProducerConsumerCollection<T>
{
    private object key = new object();
    private SortedSet<Tuple<T, int>> set;

    private Func<T, int> prioritySelector;

    public ConcurrentPriorityQueue(Func<T, int> prioritySelector, IComparer<T> comparer = null)
    {
        this.prioritySelector = prioritySelector;
        set = new SortedSet<Tuple<T, int>>(
            new MyComparer<T>(comparer ?? Comparer<T>.Default));
    }

    private class MyComparer<T> : IComparer<Tuple<T, int>>
    {
        private IComparer<T> comparer;
        public MyComparer(IComparer<T> comparer)
        {
            this.comparer = comparer;
        }
        public int Compare(Tuple<T, int> first, Tuple<T, int> second)
        {
            var returnValue = first.Item2.CompareTo(second.Item2);
            if (returnValue == 0)
                returnValue = comparer.Compare(first.Item1, second.Item1);
            return returnValue;
        }
    }

    public bool TryAdd(T item)
    {
        lock (key)
        {
            return set.Add(Tuple.Create(item, prioritySelector(item)));
        }
    }

    public bool TryTake(out T item)
    {
        lock (key)
        {
            if (set.Count > 0)
            {
                var first = set.First();
                item = first.Item1;
                return set.Remove(first);
            }
            else
            {
                item = default(T);
                return false;
            }
        }
    }

    public bool ChangePriority(T item, int oldPriority, int newPriority)
    {
        lock (key)
        {
            if (set.Remove(Tuple.Create(item, oldPriority)))
            {
                return set.Add(Tuple.Create(item, newPriority));
            }
            else
                return false;
        }
    }

    public bool ChangePriority(T item)
    {
        lock (key)
        {
            var result = set.FirstOrDefault(pair => object.Equals(pair.Item1, item));

            if (object.Equals(result.Item1, item))
            {
                return ChangePriority(item, result.Item2, prioritySelector(item));
            }
            else
            {
                return false;
            }
        }
    }

    public void CopyTo(T[] array, int index)
    {
        lock (key)
        {
            foreach (var item in set.Select(pair => pair.Item1))
            {
                array[index++] = item;
            }
        }
    }

    public T[] ToArray()
    {
        lock (key)
        {
            return set.Select(pair => pair.Item1).ToArray();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        return ToArray().AsEnumerable().GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        lock (key)
        {
            foreach (var item in set.Select(pair => pair.Item1))
            {
                array.SetValue(item, index++);
            }
        }
    }

    public int Count
    {
        get { lock (key) { return set.Count; } }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public object SyncRoot
    {
        get { return key; }
    }
}

Once you have an IProducerConsumerCollection<T> instance, which the above object is, you can use it as the internal backing object of a BlockingCollection<T> in order to have an easier to use user interface.

Servy
  • 202,030
  • 26
  • 332
  • 449
  • 1
    Combining this with `BlockingCollection` would provide a weird interface, I think. You would need to and and remove through the `BlockingCollection`, but change priority through `ConcurrentPriorityQueue`. I think that should be encapsulated. – svick Feb 18 '13 at 17:19
  • @svick Ideally, yes. Do you see another way around the issue, other than basically re-writing `BlockingCollection` but with some additional methods? – Servy Feb 18 '13 at 17:24
  • No, I don't see another way, I think doing it this way (with the additional encapsulation) is the right way if you want to do it like this. – svick Feb 18 '13 at 17:26
  • 1
    Also, it seems that when two items are considered equal by the comparer, `SortedSet` considers them completely equal. That means you can't have two different items with the same priority in the queue. You can fix that by sorting by `Item2` first and then by `Item1` (which requires `T` to be comparable). – svick Feb 18 '13 at 17:28
  • Instead of `var first = set.First();` I would prefer `var first = set.Min`. The [`Min`](https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.sortedset-1.min) property should be slightly more efficient. – Theodor Zoulias Apr 15 '22 at 02:19
3

ParallelExtensionsExtras contains several custom TaskSchedulers that could be helpful either directly or as a base for your own scheduler.

Specifically, there are two schedulers that may be interesting for you:

  • QueuedTaskScheduler, which allows you to schedule Tasks at different priorities, but doesn't allow changing the priority of enqueued Tasks.
  • ReprioritizableTaskScheduler, which doesn't have different priorities, but allows you to move a specific Task to the front or to the back of the queue. (Though changing priority is O(n) in the number of currently waiting Tasks, which could be a problem if you had many Tasks at the same time.)
mafu
  • 31,798
  • 42
  • 154
  • 247
svick
  • 236,525
  • 50
  • 385
  • 514