There is an object that can only handle one request at a time, and it takes a little bit of time to process it. After the task is done, it raises an event returning the result. The object is Computer
in the following code, and let's say that I cannot change the behaviour of this class.
Now, I want to create a wrapper class to give the clients an impression that they can send a request at any time. The request is now an async method, so that the client can simply await until the result is returned. Of course, the underlying object can process one request at a time, so the wrapper needs to queue the request, and when the "processing done" event arrives, it needs to return the result to the appropriate client. This wrapper class is SharedComputer
in the following code.
I think I need to return the value I got from "Place1" at "Place2". What is the recommended practice for this? Don't the BufferBlock/ActionBlock have a mechanism for returning a value?
static void Main(string[] args)
{
SharedComputer pc = new SharedComputer();
for(int i =0; i<10; i++)
{
Task.Factory.StartNew(async() =>
{
var r = new Random();
int randomDelay = r.Next(500, 5000);
Thread.Sleep(randomDelay);
int random1 = r.Next(0, 10);
int random2 = r.Next(0, 10);
int sum = await pc.Add(random1, random2);
if(random1 + random2 == sum)
{
Debug.WriteLine($"Got correct answer: {random1} + {random2} = {sum}.");
}
else
{
Debug.WriteLine($"Got incorrect answer: {random1} + {random2} = {sum}.");
}
});
}
System.Console.Read();
}
}
class SharedComputer
{
Computer Mainframe= Computer.GetInstance();
BufferBlock<TwoNumbers> JobQueue = new BufferBlock<TwoNumbers>();
TaskCompletionSource<int> TCS;
public SharedComputer()
{
Mainframe.Calculated += Mainframe_Calculated;
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
};
var jobProcessor = new ActionBlock<TwoNumbers>(async e =>
{
Debug.WriteLine("Starting an adding");
TCS = new TaskCompletionSource<int>();
Mainframe.StartAdding(e.A, e.B);
var res = await TCS.Task; // Place1
Debug.WriteLine("Got the result.");
}, options);
JobQueue.LinkTo(jobProcessor);
}
private void Mainframe_Calculated(object sender, int e)
{
TCS.SetResult(e);
}
public async Task<int> Add(int a, int b)
{
var data = new TwoNumbers()
{
A = a,
B = b
};
Debug.WriteLine("Queuing a new adding.");
JobQueue.Post<TwoNumbers>(data);
return 1;//Place2: Return the value received at Place1.
}
struct TwoNumbers
{
public int A;
public int B;
}
}
class Computer
{
static Computer Instance;
bool IsWorking = false;
private Computer()
{
}
public static Computer GetInstance()
{
if (Instance == null)
Instance = new Computer();
return Instance;
}
public event EventHandler<int> Calculated;
public void StartAdding(int a, int b)
{
if (IsWorking)
{
throw new InvalidOperationException("Already working.");
}
IsWorking = true;
Task.Factory.StartNew(() =>
{
Thread.Sleep(3000);
IsWorking = false;
Calculated(this, a + b);
});
}
}