4

I got the following code (which doesn't work very well in a multi threaded environment)

public class SomeClass
{
    private readonly ConcurrentQueue<ISocketWriterJob> _writeQueue = new ConcurrentQueue<ISocketWriterJob>();
    private ISocketWriterJob _currentJob;

    public void Send(ISocketWriterJob job)
    {
        if (_currentJob != null)
        {
            _writeQueue.Enqueue(job);
            return;
        }

        _currentJob = job;
        _currentJob.Write(_writeArgs);

        // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        // error checks etc removed for this sample.

        if (_currentJob.WriteCompleted(bytesTransferred))
        {
            _currentJob.Dispose();
            if (!_writeQueue.TryDequeue(out _currentJob))
            {
                _currentJob = null;
                return;
            }
        }

        _currentJob.Write(_writeArgs);

        // the job is invoked asycnhronously here.
    }
}

The Send method should invoke the job asynchronously if there isn't a current job being executed. It should enqueue the job if there is.

Putting a lock around the _currentJob assignment/check would make everything work just fine. But are there a lock free way to solve it?

Update

I'm using a socket and it's SendAsync method to send the information. Which means that I do not know if there is a write/job pending or not when the Send() method is invoked.

jgauffin
  • 99,844
  • 45
  • 235
  • 372
  • 1
    Do you need _currentJob at all? Could you not use a BlockingCollection(ConcurrentQueue) and always Add() and Take() / TryTake() from that? It would avoid having to lock _currentJob. – dashton Dec 03 '12 at 13:07
  • @ArthurRaffles: I'm using a socket and it's `SendAsync` method. I do not have a control over how long each write is taking or the thread that it's executed on. Which means that I do not know if there is a write pending or not when the `Send` method is invoked. – jgauffin Dec 03 '12 at 13:32
  • @jgauffin, btw lock should be around `_currentJob` check/assignment and `_writeQueue` enqueue/dequie. And it should be single transaction (i.e. `(check, enque or assign)` and `(dequeue, assign)`). – ony Dec 03 '12 at 15:14
  • 1
    @ByteBlast: I noticed that the last commit wasn't 100%. I'm going to fix it tomorrow. – jgauffin Jun 02 '13 at 07:38
  • @jgauffin I don't think it necessarily has anything to do with the queue. Using an `AutoResetEvent` the problem prevails. Do you know the source of the problem? – User 12345678 Jun 06 '13 at 06:52

4 Answers4

4

Consider using of CompareExchange with hypothesis about intended state transitions. No need to use ConcurrentQueue since now we are in control of our synchronization.

Updated to use state machine
Updated again to remove unneeded Interlocked.Exchange (for state assignment).

public class SomeClass
{
    private readonly Queue<ISocketWriterJob> _writeQueue = new Queue<ISocketWriterJob>();
    private ISocketWriterJob _currentJob;
    private enum State { Idle, Active, Enqueue, Dequeue };
    private State _state;

    public void Send(ISocketWriterJob job)
    {
        bool spin = true;
        while(spin)
        {
            switch(_state)
            {
            case State.Idle:
                if (Interlocked.CompareExchange(ref _state, State.Active, State.Idle) == State.Idle)
                {
                    spin = false;
                }
                // else consider new state
                break;
            case State.Active:
                if (Interlocked.CompareExchange(ref _state, State.Enqueue, State.Active) == State.Active)
                {
                    _writeQueue.Enqueue(job);
                    _state = State.Active;
                    return;
                }
                // else consider new state
                break;
            case State.Enqueue:
            case State.Dequeue:
                // spin to wait for new state
                Thread.Yield();
                break;
            }
        }

        _currentJob = job;
        _currentJob.Write(_writeArgs);

        // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        // error checks etc removed for this sample.

        if (_currentJob.WriteCompleted(bytesTransferred))
        {
            _currentJob.Dispose();

            bool spin = true;
            while(spin)
            {
                switch(_state)
                {
                case State.Active:
                    if (Interlocked.CompareExchange(ref _state, State.Dequeue, State.Active) == State.Active)
                    {
                        if (!_writeQueue.TryDequeue(out _currentJob))
                        {
                            // handle in state _currentJob = null;
                            _state = State.Idle;
                            return;
                        }
                        else
                        {
                            _state = State.Active;
                        }
                    }
                    // else consider new state
                    break;

                case State.Enqueue:
                    // spin to wait for new state
                    Thread.Yield();
                    break;

                // impossible states
                case State.Idle:
                case State.Dequeue:
                    break;
                }
            }
        }

        _logger.Debug(_writeArgs.GetHashCode() + ": writing more ");
        _currentJob.Write(_writeArgs);

        // the job is invoked asycnhronously here.
    }
}
ony
  • 12,457
  • 1
  • 33
  • 41
  • 1
    I think you need something more here. At the moment, it could go `[Send checks current job, not null]`, `(Completed sets current job to null)`, `(Completed sees there are no new jobs, returns)`, `[Send queues job, returns]`. – Rawling Dec 03 '12 at 13:00
  • @Rawling, agree looks like need some kind of 3 states active/enqueue/idle. – ony Dec 03 '12 at 13:09
  • I can just about follow what you're doing here :) – Rawling Dec 03 '12 at 13:54
  • Hmm... I think there is no need to use `Interlocked.Exchange` and it's enough to use simple `_state = ...` since all reads will require reading of synchronized value. Any comments on that? – ony Dec 03 '12 at 14:35
  • The state is only an `int` so I would expect reads/writes to be atomic anyway, so I think you're right - I don't think the `Interlocked.Exchange` calls are necessary. – Rawling Dec 03 '12 at 14:41
1

At the moment the split between your producer and consumer is a little fuzzy; you have "produce a job into a queue or consume it immediately" and "consume a job from the queue or quit if there isn't one"; it would be clearer as "produce a job into a queue" and "consume a job from the queue (initially)" and "consume a job from the queue (once a job finishes").

The trick here is to use a BlockingCollection so you can wait for a job to appear:

BlockingCollection<ISocketWriterJob> _writeQueue =
         new BlockingCollection<ISocketWriterJob>();

Let threads calling Send literally just queue a job:

public void Send(ISocketWriterJob job)
{
    _writeQueue.Add(job);
}

Then have another thread that just consumes jobs.

public void StartConsumingJobs()
{
    // Get the first job or wait for one to be queued.
    _currentJob = _writeQueue.Take();

    // Start job
}

private void HandleWriteCompleted(SocketError error, int bytesTransferred)
{
    if (_currentJob.WriteCompleted(bytesTransferred))
    {
        _currentJob.Dispose();

        // Get next job, or wait for one to be queued.
        _currentJob = _writeQueue.Take();
    }

    _currentJob.Write(_writeArgs);

   // Start/continue job as before
}
Rawling
  • 49,248
  • 7
  • 89
  • 127
  • 1
    You are blocking callback thread for taking out from queue. That may harm async io infrastructure. – ony Dec 03 '12 at 14:18
  • @ony I don't think this will block in the middle of a socket job, if that's what you mean by io; it should only block once all queued jobs are finished and it's waiting for a new one. – Rawling Dec 03 '12 at 14:21
  • yes it will block thread (I assume that this is some thread-pool that implements async I/O) which called `HandleWriteCompleted` until the moment when queue is not empty. I.e. if client stops calling Send you'll stop releasing callback thread (no return until `_writeQueue.Take()` is finished). – ony Dec 03 '12 at 14:27
  • Ah I see what you mean - if there's a lull in `Send` calls then there will be a thread stuck waiting for a new one, yes. IMO that's a reasonable price to pay in order to have simple code like this, and you can always use the [`CancellationToken` overload](http://msdn.microsoft.com/en-us/library/dd381908.aspx) for a way to kill it. – Rawling Dec 03 '12 at 14:35
  • that price is stealing thread from someone who is probably not under your control. If you'll use `CancellationToken` anyway you'll have to start own thread to wait for next job. I'd suggest you to change `StartConsumingJobs()` to use loop with cond-wait for job completion. If you are sure that calling of `Send()` will happen in some periods of time with high intensity (with load > 1.0) you can restart new thread when prev. job completed but there is no another job in queue to process yet.As summary you need some dedicated thread specifically for pulling next job and scheduling it – ony Dec 03 '12 at 14:43
0

I don't think that you will gain something from using lock-free techniques. Even with simple locks you'll be able to stay in user mode because Monitor.Enter/Monitor.Exit used spinning first and only if you'll wait longer in waiting state they'll transitioned into kernel mode.

This means that lock-based technique will perform as good as any lock-free technique, because you can lock only for storing job into the queue and getting it back from it, but you'll have much clear and robust code that every developer can understand:

public class SomeClass
{
    // We don't have to use Concurrent collections
    private readonly Queue<ISocketWriterJob> _writeQueue = new Queue<ISocketWriterJob>();
    private readonly object _syncRoot = new object();
    private ISocketWriterJob _currentJob;

    public void Send(ISocketWriterJob job)
    {
        lock(_syncRoot)
        {
            if (_currentJob != null)
            {
                _writeQueue.Enqueue(job);
                return;
            }
            _currentJob = job;
        }

        // Use job instead of shared state
        StartJob(job);
    }

    private void StartJob(ISocketWriterJob job)
    {
       job.Write(_writeArgs);
       // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        ISocketWriterJob currentJob = null;

        // error checks etc removed for this sample.
        lock(_syncRoot)
        {
           // I suppose this operation pretty fast as well as Dispose
           if (_currentJob.WriteCompleted(bytesTransferred))
            {
               _currentJob.Dispose();
              // There is no TryDequeue method in Queue<T>
              // But we can easily add it using extension method
              if (!_writeQueue.TryDequeue(out _currentJob))
              {
                  // We don't have set _currentJob to null
                  // because we'll achieve it via out parameter
                  // _currentJob = null;
                  return;
              }
           }

           // Storing current job for further work
           currentJob = _currentJob;
        }

        StartJob(currentJob);
    }
}

Lock-free is a optimization and like any other optimization you should measure performance first to make sure that you have an issue with your simple lock-based implementation and only if its true - use some lower level techniques like lock free. Performance and maintainability is a classical tradeoff and you should choose very carefully.

Sergey Teplyakov
  • 11,477
  • 34
  • 49
  • I think `WriteCompleted` and `Dispose` should be outside of `lock` (they actually might take most of the useful time that this function takes) . And why you introduce additional `currentJob`? In `StartJob` you already have shared state `_writeArgs`. Why don't you use `_currentJob` too? According to [ConcurrentQueue.TryDequeue](http://msdn.microsoft.com/en-us/library/dd287208.aspx) value is unspecified if result is `false`. – ony Dec 04 '12 at 06:06
  • According to IL (from Mono) of `out` it is implemented through references (like `&` in C++ ) so low-lever code needs not to set anything by that ref before exiting. ["calling method is required to assign a value before the method returns"](http://msdn.microsoft.com/en-us/library/t3c3bfhx%28v=vs.80%29.aspx) is restriction of C# but not the rest of languages which might be used for implementing .Net framework. – ony Dec 04 '12 at 06:19
  • @ony: about last sentence: every output variable should be definitely assigned by C# language spec (5.1.6 for example), this means that after calling method TryDequeue _currentJob would have correct value (if appropriate value exists in the queue) or would be null. BTW: we're talking about C# language but about the platform. Every implementation that conforms C# standard would behave exactly the same way regardless of the platform. – Sergey Teplyakov Dec 04 '12 at 07:40
  • please see spec on `TryDequeue`. That's logical meaning that `out` is set to something. But in fact it can just leave it as is (without overwriting) and say that it was set to some value which co-indecently equals to original value. That 5.1.6 says that when you define C# function with `out` you should assign it. Public behavior will not change with that restriction. And still it's more about specification of `TryDequeue`. It says that only if result is `true` value received by `out` contains meaningful data. In other case it can set it to anything it wants and will not brake its own spec. – ony Dec 04 '12 at 07:57
  • The result can be found here: https://github.com/jgauffin/griffin.networking/blob/master/Source/Griffin.Networking.Core/SocketWriter.cs – jgauffin Dec 20 '12 at 06:23
-2

You can mark the current job as volatile which should ensure the current thread gets the latest state. Generally though, locking is favorable.

private volatile ISocketWriterJob _currentJob;
Steve Py
  • 26,149
  • 3
  • 25
  • 43
  • Thumbs up, for locking. But volatile cannot be used with complex types. [Explanation](http://msdn.microsoft.com/en-us/library/x13ttww7%28v=vs.80%29.aspx) – DHN Dec 03 '12 at 12:34
  • @DHN `The volatile keyword can be applied to fields of these types: ... Reference types.`? But anway, [`volatile` isn't suitable for locking](http://stackoverflow.com/questions/154551/volatile-vs-interlocked-vs-lock). – Rawling Dec 03 '12 at 12:37
  • Well ok, it can be used but it ensures only that the latest reference will be accessible by the thread. At least it's what I expect. The thread safety of the fields of the complex type is not granted. – DHN Dec 03 '12 at 12:41