0

There is an object that can only handle one request at a time, and it takes a little bit of time to process it. After the task is done, it raises an event returning the result. The object is Computer in the following code, and let's say that I cannot change the behaviour of this class.

Now, I want to create a wrapper class to give the clients an impression that they can send a request at any time. The request is now an async method, so that the client can simply await until the result is returned. Of course, the underlying object can process one request at a time, so the wrapper needs to queue the request, and when the "processing done" event arrives, it needs to return the result to the appropriate client. This wrapper class is SharedComputer in the following code.

I think I need to return the value I got from "Place1" at "Place2". What is the recommended practice for this? Don't the BufferBlock/ActionBlock have a mechanism for returning a value?

    static void Main(string[] args)
    {
        SharedComputer pc = new SharedComputer();
        for(int i =0; i<10; i++)
        {
            Task.Factory.StartNew(async() =>
            {
                var r = new Random();
                int randomDelay = r.Next(500, 5000);
                Thread.Sleep(randomDelay);
                int random1 = r.Next(0, 10);
                int random2 = r.Next(0, 10);
                int sum = await pc.Add(random1, random2);
                if(random1 + random2 == sum)
                {
                    Debug.WriteLine($"Got correct answer: {random1} + {random2} = {sum}.");
                }
                else
                {
                    Debug.WriteLine($"Got incorrect answer: {random1} + {random2} = {sum}.");
                }
            });
        }
        System.Console.Read();
    }
}

class SharedComputer
{
    Computer Mainframe= Computer.GetInstance();
    BufferBlock<TwoNumbers> JobQueue = new BufferBlock<TwoNumbers>();
    TaskCompletionSource<int> TCS;

    public SharedComputer()
    {
        Mainframe.Calculated += Mainframe_Calculated;
        var options = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1
        };

        var jobProcessor = new ActionBlock<TwoNumbers>(async e =>
        {
            Debug.WriteLine("Starting an adding");
            TCS = new TaskCompletionSource<int>();
            Mainframe.StartAdding(e.A, e.B);
            var res = await TCS.Task; // Place1
            Debug.WriteLine("Got the result."); 
        }, options);

        JobQueue.LinkTo(jobProcessor);
    }

    private void Mainframe_Calculated(object sender, int e)
    {
        TCS.SetResult(e);
    }

    public async Task<int> Add(int a, int b)
    {
        var data = new TwoNumbers()
        {
            A = a,
            B = b
        };
        Debug.WriteLine("Queuing a new adding.");
        JobQueue.Post<TwoNumbers>(data);

        return 1;//Place2: Return the value received at Place1.
    }

    struct TwoNumbers
    {
        public int A;
        public int B;
    }
}

class Computer
{
    static Computer Instance;
    bool IsWorking = false;
    private Computer()
    {
    }

    public static Computer GetInstance()
    {
        if (Instance == null)
            Instance = new Computer();
        return Instance;
    }

    public event EventHandler<int> Calculated;
    public void StartAdding(int a, int b)
    {
        if (IsWorking)
        {
            throw new InvalidOperationException("Already working.");
        }
        IsWorking = true;
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(3000);
            IsWorking = false;
            Calculated(this, a + b);
        });
    }
}
Damn Vegetables
  • 11,484
  • 13
  • 80
  • 135
  • As a side note, your code creates 10 `Random` instances in quick succession, which makes it possible that some of them will be seeded with the same seed. Also the `BufferBlock` is probably redundant. The `ActionBlock` has its own internal input queue. You can see [here](https://stackoverflow.com/questions/21424084/task-sequencing-and-re-entracy/62882637#62882637) an idiomatic way to send work to an `ActionBlock`, and be notified when the work completes (it uses nested tasks instead of `TaskCompletionSource`s). – Theodor Zoulias Mar 25 '21 at 16:56
  • @TheodorZoulias Thanks. You are probably right about the redundancy. The example I had seen used a BufferBlock, so I used it like that, but it seems that ActionBlock has Post() too. And ActionBlock>> does seem to get the behaviour I want; although it is difficult to understand to have a task of a task. – Damn Vegetables Mar 25 '21 at 18:05
  • Yeap, using tasks of tasks is a quite advanced technique. You could achieve the same thing using `TaskCompletionSource`s instead, but the code would be more verbose. You would need to post the `TwoNumbers` **and** a `TaskCompletionSource` to the `ActionBlock`, creating the need for a wrapper type, or use [tuples](https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/builtin-types/value-tuples) as wrappers. – Theodor Zoulias Mar 25 '21 at 18:13

1 Answers1

1

the underlying object can process one request at a time, so the wrapper needs to queue the request, and when the "processing done" event arrives, it needs to return the result to the appropriate client.

So what you need is mutual exclusion. While you can build a mutex from TPL Dataflow and TaskCompletionSource<T>, it's much easier to just use the built-in one, SemaphoreSlim.

IMO it's cleaner to first write an async abstraction, and then add the mutual exclusion. The async abstraction would look like:

public static class ComputerExtensions
{
  public static Task<int> AddAsync(this Computer computer, int a, int b)
  {
    var tcs = new TaskCompletionSource<int>();
    EventHandler<int> handler = null;
    handler = (_, result) =>
    {
      computer.Calculated -= handler;
      tcs.TrySetResult(result);
    };
    computer.Calculated += handler;
    computer.StartAdding(a, b);
  }
}

Once you have the asynchronous API, you can apply asynchronous throttling (or mutual exclusion) easily via SemaphoreSlim:

class SharedComputer
{
  Computer Mainframe = Computer.GetInstance();
  readonly SemaphoreSlim _mutex = new();

  public async Task<int> AddAsync(int a, int b)
  {
    await _mutex.WaitAsync();
    try { return Mainframe.AddAsync(a, b); }
    finally { _mutex.Release(); }
  }
}

On a side note, use Task.Run instead of Task.Factory.StartNew.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810