1

I would like to create a LINQ to SQL backed queue. However I am unsure how the best way to go about it is.

Right now I've done it something like this:

public static void Queue(Item item)
{
    var db = new MyDataContext();

    item.Time = DateTime.Now;

    db.Items.InsertOnSubmit(item);

    db.SubmitChanges();
}

public static Item TryDequeue()
{
    try
    {
        var db = new MyDataContext();

        var item = db.Items
            .Where(x => x.Status == 0)
            .OrderBy(x => x.Time)
            .FirstOrDefault();

        if (item == null)
            return null;

        item.Status += 1;

        db.SubmitChanges();

        return item;
    }
    catch (ChangeConflictException)
    {
        return null;
    }
}

However, I do get some ChangeConflictExceptions.

I was hoping the resulting query would be an atomic transaction that picks the element and sets it status and then returns, without any conflicts, thought that doesn't seem to be the case. I've tried using TransactionScope but it complains about deadlocks.

What is the best way to go about achieving this?

ronag
  • 49,529
  • 25
  • 126
  • 221

3 Answers3

2

You could continue with the concurrency issue by doing this

db.SubmitChanges(ConflictMode.ContinueOnConflict);

But please understand its pitfalls.

Prabhu Murthy
  • 9,031
  • 5
  • 29
  • 36
1

This sounds like you're running TryDequeue on multiple threads concurrently.

Have you considered leaving queue management to a single master thread which then hands work out to others as/when they become available? You're always going to have problems with multiple threads updating the same database records.

Even if you forced everything inside a transaction, threads would start blocking, waiting for other threads to check the work queue (and a db operation is slow in terms of multithreading). This would quickly result in a bottleneck.

An alternative would be to have an in-memory cache in a thread-safe collection. Queue items would be loaded into the collection and threads could dequeue from there. Every so often (or when the queue is empty), the first thread through the method could block, flush the changes in the cache back to the database, load a new queue and then dequeue as normal and stop blocking. This reduces your number of database trips considerably and should avoid the conflict issues since only a single thread will perform the updates at any given time. The downside, of course, is that there's now a delay between the state of a job being updated and that change being stored in the database - It depends how much of an issue this would be?

The quick 'n dirty fix which will resolve the current error but not the underlying problem would be to lock and so limit threads moving through your Dequeue method...

private Object dequeueLock = new Object();
public static Item TryDequeue()
{
    lock (dequeueLock)
    {
        try
        {
            var db = new MyDataContext();

            var item = db.Items
                .Where(x => x.Status == 0)
                .OrderBy(x => x.Time)
                .FirstOrDefault();

            if (item == null)
                return null;

            item.Status += 1;

            db.SubmitChanges();

            return item;
        }
        catch (ChangeConflictException)
        {
            return null;
        }
    }
}
Basic
  • 26,321
  • 24
  • 115
  • 201
  • Is there no "atomic fetch and set" functionality in SQL? – ronag Nov 01 '12 at 11:51
  • Not really without issuing SQL commands directly. There's a really good explanation of why not here... http://www.ladislavmrnka.com/2012/09/entity-framework-and-pessimistic-concurrency - be aware there may be a serious impact on performance depending on how many threads you're using – Basic Nov 01 '12 at 21:27
-1

Try this...

private static readonly object _lock = new object

public static void Queue(Item item)
{
    try
    {
        Monitor.Enter(_lock);
        var db = new MyDataContext();

        item.Time = DateTime.Now;

        db.Items.InsertOnSubmit(item);

        db.SubmitChanges();
    }
    finally
    {
        Monitor.Exit(_lock);
    }
}

public static Item TryDequeue()
{
    try
    {
        Monitor.Enter(_lock);
        var db = new MyDataContext();

        var item = db.Items
            .Where(x => x.Status == 0)
            .OrderBy(x => x.Time)
            .FirstOrDefault();

        if (item == null)
            return null;

        item.Status += 1;

        db.SubmitChanges();

        return item;
    }
    catch (ChangeConflictException)
    {
        return null;
    }
    finally
    {
        Monitor.Exit(_lock);
    }
}
Kevin
  • 552
  • 2
  • 4