1

Currently, I do my first steps with async/await and tasks in .NET and I am very excited about how easy it is to run things asynchronously! However, at the moment I have to communicate with devices through a SerialPort. Since only one connection is possible at the same time, I just wrote a few extension methods to run all those methods, coming from different tasks/threads, synchronously and in a first in first out order:

public static class Extensions
{
    private readonly static object LockObject = new object();

    public static Task<TResult> RunAfter<TResult>(this Task<TResult> task, ConcurrentQueue<Task> others)
        => (Task<TResult>)task.RunAllSynchronously(others);

    public static Task RunAfter(this Task task, ConcurrentQueue<Task> others)
        => task.RunAllSynchronously(others);

    private static Task RunAllSynchronously(this Task task, ConcurrentQueue<Task> others)
    {
        if (others == null) throw new ArgumentNullException("The value of " + nameof(others) + " is null!");
        lock (LockObject)
        {
            others.Enqueue(task);
            Task currentTask;
            while (others.TryDequeue(out currentTask))
            {
                currentTask.RunSynchronously();
                if (currentTask == task) break;
            }
        }
        return task;
    }
}

Does this approach seems to be a good way or should such a case be treated differently?

Daniel Fink
  • 173
  • 1
  • 8
  • You're running everything synchronously. Why use tasks at all? I'd suggest reading [this](http://stackoverflow.com/questions/23230375/sample-serial-port-comms-code-using-async-api-in-net-4-5) on how to really take advantage of asynchrony with serial port. – Yuval Itzchakov Aug 10 '15 at 08:40
  • THX, I will definitely read this article! I wonder why I haven't found it already... – Daniel Fink Aug 10 '15 at 09:00
  • Daniel, first of all, you should answer your own question adding an answer, not as part of your question. Secondly, your solution it's similar to the final form of mine. You should either mark mine as the right one or just add your own answer... – Matías Fidemraizer Aug 13 '15 at 16:42
  • Did it, thanks for your advice ;) – Daniel Fink Aug 14 '15 at 07:14

3 Answers3

3

Why you run them synchronously?

You should run tasks asynchronously and use async and await to execute them one by one:

 Task currentTask;
 while (others.TryDequeue(out currentTask))
 {
      await currentTask;
      if (currentTask == task) break;
 }

In the other hand, looking at your code, I can't find a reason to use lock (thread synchronization) at all. You synchronize threads against some shared resource (i.e. some object that may or may not be expected to be read/modified by more than a thread). You method could be reworked to:

private static async Task RunAllAsync(this Task task, ConcurrentQueue<Task> others)
{
    // Design by contract rocks ;)
    // See:  https://msdn.microsoft.com/en-us/library/dd264808(v=vs.110).aspx
    Contracts.Requires(task != null);
    Contracts.Requires(others != null);

    others.Enqueue(task);

    // See how I've improved your loop. Since ConcurrentQueue.TryDequeue
    // will return false if other thread has called it already, your loop
    // should try to dequeue again until it returns true, and it should
    // break if dequeued task is the task against which the extension method
    // was called or the concurrent queue has no more items, to prevent a 
    // possible infinite loop
    do
    { 
       Task currentTask;
       if(others.TryDequeue(out currentTask))
          await currentTask;

    }
    while (currentTask == task || others.Count > 0);

    return task;
}

Update

OP said:

I have possibly forgotten to say, that the ConcurrentQueue is the resource that should be shared among the threads. I.e. Task.RunAllSynchronously() is called on every new Task (access to SerialPort) and this call could be come from a different thread. Also, I cannot ensure that RunAllSynchronously() is just called, when all currently running (or queued) tasks are finished (I could, but therefore i had to use something like lock outside the extension method, which is not really that nice of having an extension method.

This is why you're using ConcurrentQueue<T>. Thread safety is managed internally. If you call ConcurrentQueue<T>.TryDequeue and more than one thread calls it at once, only one will win and others will receive false as return value and out parameter won't be assigned. See what MSDN says for this:

ConcurrentQueue handles all synchronization internally. If two threads call TryDequeue at precisely the same moment, neither operation is blocked. When a conflict is detected between two threads, one thread has to try again to retrieve the next element, and the synchronization is handled internally.

TryDequeue tries to remove an element from the queue. If the method is successful, the item is removed and the method returns true; otherwise, it returns false. That happens atomically with respect to other operations on the queue. If the queue was populated with code such as q.Enqueue("a"); q.Enqueue("b"); q.Enqueue("c"); and two threads concurrently try to dequeue an element, one thread will dequeue a and the other thread will dequeue b. Both calls to TryDequeue will return true, because they were both able to dequeue an element. If each thread goes back to dequeue an additional element, one of the threads will dequeue c and return true, whereas the other thread will find the queue empty and will return false.

Matías Fidemraizer
  • 63,804
  • 18
  • 124
  • 206
  • @YuvalItzchakov I was going to edit to point out that I don't find a reason to use `lock` in OP's code.. – Matías Fidemraizer Aug 10 '15 at 08:45
  • @YuvalItzchakov Done! – Matías Fidemraizer Aug 10 '15 at 08:49
  • During trying different things with tasks and async, I noticed, that you cannot use await within a lock block. In this case, I really has to use lock, aren’t I? – Daniel Fink Aug 10 '15 at 08:50
  • @Daniel No need for a lock - you're only working with locals and thread-safe objects. Unless you need to ensure only one task executes at a time, the lock is unnecessary. And if you do need that, use a `SemaphoreSlim` instead. – Luaan Aug 10 '15 at 08:52
  • @Luaan thx. I have possibly forgotten to say, that the ConcurrentQueue is the resource that should be shared among the threads. I.e. Task.RunAllSynchronously() is called on every new Task (access to SerialPort) and this call could be come from a different thread. Also, I cannot ensure that RunAllSynchronously() is just called, when all currently running (or queued) tasks are finished (I could, but therefore i had to use something like lock outside the extension method, which is not really that nice of having an extension method. – Daniel Fink Aug 10 '15 at 08:58
  • @Luaan I just found a way using something like SemaphoreSlim already (https://msdn.microsoft.com/en-us/library/ee789351%28v=vs.110%29.aspx) but for me, using extension methods looks a bit more pretty. – Daniel Fink Aug 10 '15 at 09:02
  • @Daniel The whole point of `ConcurrentQueue` is to ensure a thread-safe queue - it's explicitly designed to work without locks. And careful about `RunSynchronously`, it's not necessarily "run inline". Perhaps writing your own `TaskScheduler` would be a better solution to this? It's not entirely obvious what you're trying to do. – Luaan Aug 10 '15 at 09:06
  • @Daniel I've re-updated the answer. Check the reworked extension method and how `do while` will work better in your scenario ;) – Matías Fidemraizer Aug 10 '15 at 09:16
  • Thanks to both of you! The initially problem was, that I only can allow one access to the SerialPort at once. Therefore, I thought I can handle this by using extension methods to only run one task after another and not parallel. Looking back, implementing an own TaskScheduler seems to be a good idea, as I explicitly want to limit the access to one single task (I don't care of an order - just first in first out). I would upvote your posts, but I am not able to, due to less reputation. – Daniel Fink Aug 10 '15 at 09:18
  • @MatíasFidemraizer thanks ;) Using do while is really a better way =) – Daniel Fink Aug 10 '15 at 09:19
1

First of all:

You only benefit from async-await if your program has something else to do while your tasks are running.

If your main thread would start a task, and do nothing but wait for this task to finish, your main thread could do the work himself. That would even be faster.

In your example, I can imagine that sending over the serial line is significantly slower than your processing. So I can imagine that while one thread is busy sending data over the serial line, your thread can be busy creating the next data that is to be sent. Or maybe 10 threads are creating data that is to be sent one after another. Of course in the latter case it is not guaranteed in which order the data will be sent.

Buf let's see it simpler: one thread is creating data in its own speed, while another thread is sending data independently over the serial line.

This screams for a producer - consumer pattern: one thread is the producer, it produces items that the consumer reads and processes. After a while the producer tells the consumer that no data is to be expected anymore.

The key object in this is System.Threading.Tasks.Dataflow.BufferBlock. See MSDN. The remarks section says that it is distributed via nuget.

The bufferBlock implements two interfaces:

  • ITargetBlock<T> for the producer to send its output to
  • ISourceBlock<T> for the consumer to read the input from.

Let's assume you use System.IO.Ports.SerialPort to send your data. Alas this class has no async support, so we have to create it ourselves. Let's assume you want to convert objects of type T into a format that can be sent over the serial line. Code would look like follows:

private void Write(T t)
{
    var dataToSend = ConvertToData(t);
    serialPort.Write(dataToSend);
}

Not very async is it. So let's make an async function ofit:

private async Task WriteAsync(T t)
{
    return await Task.Run ( () =>
    {
        var dataToSend = ConvertToData(t);
        serialPort.Write(dataToSend);
    }
}

Or you could just call the other write function:

return await Task.Run ( () => Write(t));

Note: if you make sure there is only one thread that will use this function, you don't have to lock it.

Now that we do have an async function to send objects of type T over the serial line, let's create a producer that will create objects of type T and send them to the bufferblock.

I'll make it async, so the calling thread can do other things while data is being produced:

private BufferBlock<T> bufferBlock = new BufferBlock<T>();

private async Task ProduceAsync()
{
    while (objectsToProcessAvailable())
    {
        T nextObject = GetNextObjectToProcess()
        await bufferBlock.SendAsync(nextObject);
    }
    // nothing to process anymore: mark complete:
    bufferBlock.Complete();
}

The receiving side will be done by a different thread:

private Task ConsumeAsync()
{
    // as long as there is something to process: fetch it and process it
    while (await bufferBlock.OutputAvailableAsync())
    {
        T nextToProcess = await bufferBlock.ReceiveAsync();
        // use WriteAsync to send to the serial port:
        await WriteAsync(nextToProcess);
    }
    // if here: no more data to process. Return
}

Now all we need is one procedure that creates the two threads and waits until both tasks are finished:

private async Task ProduceConsumeAsync()
{
    var taskProducer = ProduceAsync();
    // while the producer is busy producing, you can start the consumer:
    var taskConsumer = ConsumeAsync();
    // while both tasks are busy, you can do other things,
    // like keep the UI responsive
    // after a while you need to be sure the tasks are finished:
    await Task.WhenAll(new Task[] {taskProducer, taskConsumer});
}

Note: because of the bufferBlock it is no problem that the producer is already producing while the consumer is not started yet.

All we need is a function that starts the async, if you have an event handler just declare it async:

private async void OnButton1_clicked(object sender, ...)
{
    await ProduceConsumeAsync()
}

If you have no async function, you have to create a task yourself:

private void MyFunction()
{
    // start produce consume:
    var myTask = Task.Run( () => ProduceConsumeAsync());
    // while the task is running, do other things.
    // when you need the task to finish:
    await myTask;
 }

More information about the consumer - producer pattern. See MSDN

How to: Implement a Producer-Consumer Dataflow Pattern

Harald Coppoolse
  • 28,834
  • 7
  • 67
  • 116
  • Thank you very much! You're totally right with using this pattern. Also thank you for your very detailed description for this scenario! However, I am using NModbus for accessing the serial port. This lib already offers async methods for accessing the serial port. The only thing I have to manage is that only one access is granted at one time. However, this pattern would be very appropriate for NModbus itself to implement first in first out semantics. For my case (on top of NModbus) this pattern wouldn't be suitable, as I do not have a 'base' method like the 'WriteAsync' method in your example. – Daniel Fink Aug 10 '15 at 13:10
  • In your case the only difference would be that you don't need the function Write(T), WriteAsync(T) would call your NModBus async function, or am I seeing things too simple? – Harald Coppoolse Aug 10 '15 at 13:29
  • Well, it's not that simple, as NModbus's write functions has different parameters (not 'only' a message object or something similar) but thats the reason why I sad it would be very appropriate inside NModBus, at the layer where the 'real' Write(object message) function is. Another question that could come up is a return type. With the Producer-Consumer Pattern, there is no easy approach for dealing with return types, or am I wrong? – Daniel Fink Aug 10 '15 at 13:37
  • Do you want a return as a result of the complete processing, or a result per processed item? Let the consumer return the requested result. Remember if a function would return TResult, the async version would return Task`. After await Task.WhenAll, you can check property Result of taskConsumer which contains the return value TResult. Other method: let the consumer produce the results in a buffer block and let someone consume that output – Harald Coppoolse Aug 10 '15 at 14:04
  • Yes, I meant a return type per processed item. Letting the consumer put his resluts into another BufferBlock could work, but than I have to 'map' in some way the input items (tasks in my case) with the output results. However, I might found a simple solution for my initial problem, see Update. – Daniel Fink Aug 10 '15 at 20:46
1

After playing with various things I just found a simple solution, which should be sufficient for me and is somewhat similar to the solution of Matías Fidemraizer:

private static ConcurrentQueue<Task> Tasks { get; } = new ConcurrentQueue<Task>();

public async static Task RunAlone(this Task task)
{
    Tasks.Enqueue(task);

    do
    {
        var nextTask = Tasks.First();

        if (nextTask == task)
        {
            nextTask.Start();
            await nextTask;
            Task deletingTask;
            Tasks.TryDequeue(out deletingTask);
            break;
        }
        else
        {
            nextTask.Wait();
        }
    } while (Tasks.Any());
}

public async static Task<TResult> RunAlone<TResult>(this Task<TResult> task)
{
    TResult result = default(TResult);
    Tasks.Enqueue(task);

    do
    {
        var nextTask = Tasks.First();

        if (nextTask == task)
        {
            nextTask.Start();
            result = await (Task<TResult>)nextTask;
            Task deletingTask;
            Tasks.TryDequeue(out deletingTask);
            break;
        }
        else
        {
            nextTask.Wait();
        }
    } while (Tasks.Any());

    return result;
}
Community
  • 1
  • 1
Daniel Fink
  • 173
  • 1
  • 8