2

Background

We have a service operation that can receive concurrent asynchronous requests and must process those requests one at a time.

In the following example, the UploadAndImport(...) method receives concurrent requests on multiple threads, but its calls to the ImportFile(...) method must happen one at a time.

Layperson Description

Imagine a warehouse with many workers (multiple threads). People (clients) can send the warehouse many packages (requests) at the same time (concurrently). When a package comes in a worker takes responsibility for it from start to finish, and the person who dropped off the package can leave (fire-and-forget). The workers' job is to put each package down a small chute, and only one worker can put a package down a chute at a time, otherwise chaos ensues. If the person who dropped off the package checks in later (polling endpoint), the warehouse should be able to report on whether the package went down the chute or not.

Question

The question then is how to write a service operation that...

  1. can receive concurrent client requests,
  2. receives and processes those requests on multiple threads,
  3. processes requests on the same thread that received the request,
  4. processes requests one at a time,
  5. is a one way fire-and-forget operation, and
  6. has a separate polling endpoint that reports on request completion.

We've tried the following and are wondering two things:

  1. Are there any race conditions that we have not considered?
  2. Is there a more canonical way to code this scenario in C#.NET with a service oriented architecture (we happen to be using WCF)?

Example: What We Have Tried?

This is the service code that we have tried. It works though it feels like somewhat of a hack or kludge.

static ImportFileInfo _inProgressRequest = null;

static readonly ConcurrentDictionary<Guid, ImportFileInfo> WaitingRequests = 
    new ConcurrentDictionary<Guid, ImportFileInfo>();

public void UploadAndImport(ImportFileInfo request)
{
    // Receive the incoming request
    WaitingRequests.TryAdd(request.OperationId, request);

    while (null != Interlocked.CompareExchange(ref _inProgressRequest, request, null))
    {
        // Wait for any previous processing to complete
        Thread.Sleep(500);
    }

    // Process the incoming request
    ImportFile(request);

    Interlocked.Exchange(ref _inProgressRequest, null);
    WaitingRequests.TryRemove(request.OperationId, out _);
}

public bool UploadAndImportIsComplete(Guid operationId) => 
    !WaitingRequests.ContainsKey(operationId);

This is example client code.

private static async Task UploadFile(FileInfo fileInfo, ImportFileInfo importFileInfo)
{
    using (var proxy = new Proxy())
    using (var stream = new FileStream(fileInfo.FullName, FileMode.Open, FileAccess.Read))
    {
        importFileInfo.FileByteStream = stream;
        proxy.UploadAndImport(importFileInfo);
    }

    await Task.Run(() => Poller.Poll(timeoutSeconds: 90, intervalSeconds: 1, func: () =>
    {
        using (var proxy = new Proxy())
        {
            return proxy.UploadAndImportIsComplete(importFileInfo.OperationId);
        }
    }));
}

It's hard to write a minimum viable example of this in a Fiddle, but here is a start that give a sense and that compiles.

As before, the above seems like a hack/kludge, and we are asking both about potential pitfalls in its approach and for alternative patterns that are more appropriate/canonical.

Shaun Luttin
  • 133,272
  • 81
  • 405
  • 467
  • You say that it must handle the request on a separate thread but completed on the original thread. What is the difference between handling and completing? – John Wu May 05 '18 at 00:14
  • What I mean is that the same thread that receives a request must also complete the request. – Shaun Luttin May 05 '18 at 05:21
  • Please define “complete” and tell me how it is different from “handle,” in the context of your problem. Also, if completion happens on the same thread, what is the purpose of having an endpoint for polling the status? – John Wu May 05 '18 at 05:34
  • @JohnWu I've gone ahead and replaced "complete" and "handle" with the word "process" to make the situation clearer. – Shaun Luttin May 05 '18 at 05:44
  • @JohnWu The reason for having polling is that the service operation is a one way, fire-and-forget operation. – Shaun Luttin May 05 '18 at 05:46
  • Considering the context you gave I think a WCF service is good. Instead of polling for the progress I would use socket based solution like SignalR. This enables for real-time updates. The server can now send data like progress to the client at any time. Connection is permanent. Client subscribes to a server side message hub by registering a callback. It's pretty simple, SignalR. Regarding race conditions in your example: Your "ImportFile()" requires synchronization since it is not thread safe as you mentioned. – BionicCode May 05 '18 at 06:43
  • You said requests must be synchronized because ImportFile is not thread safe. But in your requirement list you want each incoming asynchronous request to be processed in parallel (requirement 2)? Mistake? – BionicCode May 05 '18 at 06:52
  • @ShaunLuttin Polling is not fire-and-forget. You have to wait and ask repeatedly if you are allowed to invoke a method. Event based communication is fire-and-forget, since you trigger the action and can forget it. The event sender will invoke your callback when he is ready. No waiting. – BionicCode May 05 '18 at 07:40
  • @BionicCode You said, "Instead of polling for the progress I would use socket based solution like SignalR." Me too. That said, one requirement of the task is for `ImportAndUpdate` to be a one-way, fire-and-forget operation. – Shaun Luttin May 05 '18 at 13:33
  • @BionicCode You said, "But in your requirement list you want each incoming asynchronous request to be processed in parallel..." That's not quite right... we want each incoming request to be *received* in parallel and then *processed* one at a time. – Shaun Luttin May 05 '18 at 13:36
  • @BionicCode You said, "Polling is not fire-and-forget." Right. We need the `ImportAndUpdate` to be a fire-and-forget operation. The polling endpoint, though, can be two-way. – Shaun Luttin May 05 '18 at 13:39
  • @BionicCode You said, "Event based communication is fire-and-forget, since you trigger the action and can forget it. The event sender will invoke your callback when he is ready. No waiting." Right. Our requirements, though, are 1. not to use a callback for ImportAndUpdate, 2. instead to make ImportAndUpdate a one way, fire-and-forget operation, and 3. to expose an endpoint at which the client can ask whether the operation completed – Shaun Luttin May 05 '18 at 14:03

3 Answers3

2

Simple solution using Producer-Consumer pattern to pipe requests in case of thread count restrictions.

You still have to implement a simple progress reporter or event. I suggest to replace the expensive polling approach with an asynchronous communication which is offered by Microsoft's SignalR library. It uses WebSocket to enable async behavior. The client and server can register their callbacks on a hub. Using RPC the client can now invoke server side methods and vice versa. You would post progress to the client by using the hub (client side). In my experience SignalR is very simple to use and very good documented. It has a library for all famous server side languages (e.g. Java).

Polling in my understanding is the totally opposite of fire-and-forget. You can't forget, because you have to check something based on an interval. Event based communication, like SignalR, is fire-and-forget since you fire and will get a reminder (cause you forgot). The "event side" will invoke your callback instead of you waiting to do it yourself!

Requirement 5 is ignored since I didn't get any reason. Waiting for a thread to complete would eliminate the fire and forget character.

private BlockingCollection<ImportFileInfo> requestQueue = new BlockingCollection<ImportFileInfo>();
private bool isServiceEnabled;
private readonly int maxNumberOfThreads = 8;
private Semaphore semaphore = new Semaphore(numberOfThreads);
private readonly object syncLock = new object();

public void UploadAndImport(ImportFileInfo request) 
{            
  // Start the request handler background loop
  if (!this.isServiceEnabled)
  {
    this.requestQueue?.Dispose();
    this.requestQueue = new BlockingCollection<ImportFileInfo>();

    // Fire and forget (requirement 4)
    Task.Run(() => HandleRequests());
    this.isServiceEnabled = true;
  }

  // Cache multiple incoming client requests (requirement 1) (and enable throttling)
  this.requestQueue.Add(request);
}

private void HandleRequests()
{
  while (!this.requestQueue.IsCompleted)
  {
    // Wait while thread limit is exceeded (some throttling)
    this.semaphore.WaitOne();

    // Process the incoming requests in a dedicated thread (requirement 2) until the BlockingCollection is marked completed.
    Task.Run(() => ProcessRequest());
  }

  // Reset the request handler after BlockingCollection was marked completed
  this.isServiceEnabled = false;
  this.requestQueue.Dispose();
}

private void ProcessRequest()
{
  ImportFileInfo request = this.requestQueue.Take();
  UploadFile(request);

  // You updated your question saying the method "ImportFile()" requires synchronization.
  // This a bottleneck and will significantly drop performance, when this method is long running. 
  lock (this.syncLock)
  {
    ImportFile(request);
   }

  this.semaphore.Release();
}

Remarks:

  • BlockingCollection is a IDisposable
  • TODO: You have to "close" the BlockingCollection by marking it completed: "BlockingCollection.CompleteAdding()" or it will loop indeterminately waiting for further requests. Maybe you introduce a additional request methods for the client to cancel and/ or to update the process and to mark adding to the BlockingCollection as completed. Or a timer that waits an idle time before marking it as completed. Or make your request handler thread block or spin.
  • Replace Take() and Add(...) with TryTake(...) and TryAdd(...) if you want cancellation support
  • Code is not tested
  • Your "ImportFile()" method is a bottleneck in your multi threading environment. I suggest to make it thread safe. In case of I/O that requires synchronization, I would cache the data in a BlockingCollection and then write them to I/O one by one.
BionicCode
  • 1
  • 4
  • 28
  • 44
  • Thank you for pointing me in the direction of Producer-Consumer, giving an example, and providing some gotchas. That gives me an alternative approach to investigate. – Shaun Luttin May 05 '18 at 06:13
  • @Shaun Luttin Okay, I think you edited your question. Let me update my example to synchronize the calls on ImportFIle() by introducing a pipe like chained BlockingCoillections. But UploadFile() is intended to run in parallel? – BionicCode May 05 '18 at 06:32
  • Yes. Apologies for editing my question. I can roll it back if that seems appropriate. You asked about whether `UploadFile()` is supposed to be in parallel. I've given the question more of a think, and it comes down to this: `UploadAndImport` has two parts. Part one can be in parallel, part two must to be synchronized. – Shaun Luttin May 05 '18 at 13:54
  • In case you're interested, I have forked my original .NET Fiddle with a version that I think is more clear: https://dotnetfiddle.net/VUzdXR The in-memory queue still has the brittleness that @JohnWu mentioned will occur if we're receiving tons of requests. – Shaun Luttin May 05 '18 at 14:26
  • @ShaunLuttin I don't know anything about your environment. If you expecting "tons of requests" than you would encounter more serious problems than hardware resources like memory. Your response time would be too long leaving your clients wait. You must know in advance which load to expect and design your system accordingly. When execution time of processing each request is too high, than you have a bottleneck here. You could then host a number of services in parallel and could use a reverse proxy to balance the load. Or you could buy a bigger CPU. Scale horizontally or vertically. – BionicCode May 06 '18 at 10:56
  • @ShaunLuttin Do you expect the request data to be big? E.g. WCF handles request queuing for you. But you need some storage for your data. You could add dedicated servers or you use a file system based storage (e.g. flat file data base) or cache. This is a question of memory like HDD vs RAM. To use a database as a request queue, as suggested, is not the best idea maybe even bad. It makes more sense to scale the system in order to process requests faster than to scale the request queue. – BionicCode May 06 '18 at 11:06
  • @ShaunLuttin Regarding you polling problem I think it wuold be better to use something like a database system to solve your progress problem. According to your example a person, who wants to track some state of items or process, would usually need some ID to relate a process to a person. He would then query a database to retrieve the desired information. If you want to have real-time progress report e.g. file upload, you some kind of session based system and a shared, maybe hash table based, context. Incoming requests must identify by an ID and each ID has a progress. – BionicCode May 06 '18 at 11:20
  • @ShaunLuttin If your API is supposed to be RESTfull than storing session data would violate your constraints. But real-time client-server communication is properly done by using e.g. WebSocket or anything that allows full-duplex. And continuous writing to the database to store upload progress and simultaneously reading the progress introduces more problems, because you said you are expecting "tons of requests". – BionicCode May 06 '18 at 11:27
  • You said, "If you['re] expecting "tons of requests" than you would..." So, we're not expecting tons of requests. That's just a potential risk that John Wu raised. – Shaun Luttin May 07 '18 at 15:45
  • You asked, "Do you expect the request data to be big?" The request data will not be very big. It will predominantly be a PDF file and some meta data associated with it. – Shaun Luttin May 07 '18 at 15:47
  • You said, "Regarding you polling problem I think it wuold be better to use something like a database system..." That's a terrific idea; I'm surprised I did think of that earlier. – Shaun Luttin May 07 '18 at 15:48
  • You said, "If your API is supposed to be RESTfull than storing session data would violate your constraints." That is also a really good point. – Shaun Luttin May 07 '18 at 15:49
  • Marked as answer because of the reference to the producer-consumer pattern. – Shaun Luttin May 07 '18 at 17:07
1

The problem is that your total bandwidth is very small-- only one job can run at a time-- and you want to handle parallel requests. That means that queue time could vary wildly. It may not be the best choice to implement your job queue in-memory, as it would make your system much more brittle, and more difficult to scale out when your business grows.

A traditional, scaleable way to architect this would be:

  • An HTTP service to accept requests, load balanced/redundant, with no session state.
  • A SQL Server database to persist the requests in a queue, returning a persistent unique job ID.
  • A Windows service to process the queue, one job at a time, and mark jobs as complete. The worker process for the service would probably be single-threaded.

This solution requires you to choose a web server. A common choice is IIS running ASP.NET. On that platform, each request is guaranteed to be handled in a single-threaded manner (i.e. you don't need to worry about race conditions too much), but due to a feature called thread agility the request might end with a different thread, but in the original synchronization context, which means you will probably never notice unless you are debugging and inspecting thread IDs.

John Wu
  • 50,556
  • 8
  • 44
  • 80
0

Given the constraints context of our system, this is the implementation we ended up using:

static ImportFileInfo _importInProgressItem = null;

static readonly ConcurrentQueue<ImportFileInfo> ImportQueue = 
    new ConcurrentQueue<ImportFileInfo>();

public void UploadAndImport(ImportFileInfo request) {
    UploadFile(request);
    ImportFileSynchronized(request);
}

// Synchronize the file import, 
// because the database allows a user to perform only one write at a time.
private void ImportFileSynchronized(ImportFileInfo request) {
    ImportQueue.Enqueue(request);
    do {
        ImportQueue.TryPeek(out var next);
        if (null != Interlocked.CompareExchange(ref _importInProgressItem, next, null)) {
            // Queue processing is already under way in another thread.
            return;
        }

        ImportFile(next);
        ImportQueue.TryDequeue(out _);
        Interlocked.Exchange(ref _importInProgressItem, null);
    }
    while (ImportQueue.Any());
}

public bool UploadAndImportIsComplete(Guid operationId) =>
    ImportQueue.All(waiting => waiting.OperationId != operationId);

This solution works well for the loads we are expecting. That load involves a maximum of about 15-20 concurrent PDF file uploads. The batch of up to 15-20 files tends to arrive all at once and then to go quiet for several hours until the next batch arrives.

Criticism and feedback is most welcome.

Shaun Luttin
  • 133,272
  • 81
  • 405
  • 467