0

I'm using MassTransit (with RabbitMQ) for message queuing in a C# console application (actually hosted as a Windows Service, using TopShelf), in conjunction with NHibernate.

The flow our application consists of:

  1. Watching a file share for scanned documents (using a FileSystemWatcher)
  2. Doing some processing on the file (moving it to a new file location, inserting a record into our DB)
  3. Performing OCR on the document and reading it to see if it contains certain words
  4. Modify our database record to reflect the result of step 3.

From a high level technical implementation, the steps for this work by:

  1. Doing initial processing in the thread on which the FileSystemWatcher handles its Created event. Upon completion, publishing a message to our queue to perform the OCR and word check.
  2. MassTransit handles the message, creates a new lifetime scope, and instantiates a consumer to handle it
  3. The consumer performs the OCR with a call to an IOCRService implementation. When complete, and in this same consumer, we fetch the words (from the database) that we want to search for, then read the document text for to find those words.
  4. Post a response message back to MassTransit/RabbitMQ, and the consumer of this message modifies our database entry according to whether or not any words were found.

The problem I'm having is in step 3 of the above. This is approximately what our old code looks like:

public class Consumer
{
   public Task Handle(message)
   {
      _Ocr.DoOcr(message); //Performed in-process
      var response = DoDirtyWordCheck(message);
      _Publisher.Publish(response);
      return Task.CompletedTask;
   }

   private CheckResponse DoDirtyWordCheck(Message message)
   {
      var wordsToFind = _DB.FindWords();
      var response = _Checker.SearchForWords(message);
   }
}

The big change I'm making to this flow is that the OCR step is being taken out-of-process, and placed into a micro-service, and being invoked via a call to the micro-service using HttpClient. The new code is mostly the same:

public class Consumer
{
   public async Task Handle(message)
   {
      await _Ocr.DoOcr(message); //calls out to micro-service and awaits the result; method on interface //changed from void return to Task return
      var response = DoDirtyWordCheck(message);
      _Publisher.Publish(response);
   }

   private CheckResponse DoDirtyWordCheck(Message message)
   {
      var wordsToFind = _DB.FindWords();  //Fails here
      var response = _Checker.SearchForWords(message);
   }
}

What I've been finding, however, is that this now often fails on the call to _DB.FindWords(). Well, as it turns out, this call is happening on a thread that is different from the thread on which my lifetime scope was started, which is the same thread as the call to await _OCR.DoOCR(); Why is that a problem? Because of the fact that NHibernate Sessions are not thread-safe, our DB layer (which is very complicated) ensures that operations can only be performed on the thread on which it was created.

Now, my former understanding of async/await was that no "trickiness" would be done with extra threads, such that I wouldn't have to worry about code necessarily needing to be thread-safe in order to await it.

However, after doing a deep dive into async/await and coming to some understanding about how it works, it seems that some work actually is being done on a ThreadPool thread (whether this is the actual awaited work or the continuation after the await, I'm still not exactly sure), and this has something to do with the fact that in a console application, there is no SynchronizationContext, which determines how this process is done; whereas, in a WPF application, work that is awaited on the UI thread will be necessarily continued on that same UI thread (which is the sort of behavior I was expecting to exhibited in all contexts).

So, this brings me to my ultimate question: How can I ensure that the code that needs to continue, after my call to await, continues on that same thread?

I understand that my above code flow could be restructured in ways that prevent this, e.g., I could break out the OCR operation and the dirty word check operation into two separate consumers so that each operation is in its own distinct context/lifetime scope, and probably any other number of things. However, any solution that requires this sort of restructuring is problematic to me, in that it seems to point to the fact that async/await is a leakier abstraction than it would seem to be at first glance.

It doesn't seem that this code, which could be library code that might be run under any context, should have to depend on anything having to do with threading models, ONLY due to the fact that one call in this chain is now awaited. Seems that it should "just work". Everything that has to do with lifetimes and scopes and threading for this use case, in my application, is expected to be atomic at the Consumer level, and which we expect to be taken care of (and has been) by the handling built into MassTransit, and our structuring of code around it.

Now, it seems to me that a possible solution exists in the SynchronizationContext that I could be setting (or the TaskSheduler?), which is what takes care of this sort of work in a WPF application, or a WinForms application, but which is not used at all (by default) in a console application. Unfortunately, it seems that it's no longer a common use case to be doing anything in a console application, let alone something that might pose this requirement, so I can't really find any pre-existing and well-tested solutions that do this sort of thing. Also, I don't yet know enough about the deep-down internals of the async-await implementation that I would feel comfortable hand-rolling my own solution to this, due to any unexpected implications.

So, can anyone help me out on this? Are there any problems with my underlying assumptions that are making me think about this in the wrong way? Or is there really a problem with the overall design/structure of the program itself?

Marc Chu
  • 201
  • 3
  • 17

1 Answers1

2

Why is that a problem? Because of the fact that NHibernate Sessions are not thread-safe, our DB layer (which is very complicated) ensures that operations can only be performed on the thread on which it was created.

I'm not familiar with NHibernate, but my first instinct is to see if a newer version of NHibernate supports async-aware sessions. If that's the case, then your solution can be as simple as a version upgrade.

However, after doing a deep dive into async/await and coming to some understanding about how it works, it seems that some work actually is being done on a ThreadPool thread (whether this is the actual awaited work or the continuation after the await, I'm still not exactly sure)

I usually recommend people start with my async intro, which attempts to be "everything you need to know to start using async". There's a couple of points described in that post that should clarify a bit:

  1. async methods begin executing synchronously. To put it another way, await Func(); is roughly the same as var task = Func(); await task;. So the awaited method is invoked first and then the task it returns is awaited.
  2. Each await captures a "context", which is the current SynchronizationContext or TaskScheduler. This defaults to the thread pool scheduler if there is no context. This "context" is used to resume the async method once the await completes.

this has something to do with the fact that in a console application, there is no SynchronizationContext

Yes, that's correct. Console apps do not have a SynchronizationContext, and thus they use the thread pool scheduler by default.

work that is awaited on the UI thread will be necessarily continued on that same UI thread (which is the sort of behavior I was expecting to exhibited in all contexts).

A SynchronizationContext determines where code will run; it does not necessarily resume on the same thread. One counterexample is the pre-Core ASP.NET SynchronizationContext, which could resume on any thread pool thread.

So, this brings me to my ultimate question: How can I ensure that the code that needs to continue, after my call to await, continues on that same thread?

Provide your own SynchronizationContext. Or just block on the asynchronous method call.

async/await is a leakier abstraction than it would seem to be at first glance.

async/await is not an abstraction. It's syntactic sugar. If it were an abstraction, then yes, it would be incredibly leaky, since in most cases you should go "async all the way".

Unfortunately, it seems that it's no longer a common use case to be doing anything in a console application, let alone something that might pose this requirement

Console apps are starting to make a comeback with .NET Core. However, it's rare to have a Console app that also uses thread-affine components.

I can't really find any pre-existing and well-tested solutions that do this sort of thing.

You could use one I wrote: AsyncContext is capable of installing a SynchronizationContext and running some asynchronous code and blocking until it completes.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Thanks for all that. Things seem to be working with AsyncContext now; however, I'm not sure if I'm necessarily using it correctly. It would seem to be intended to be used at the highest level possible, i.e. Program.Main(), as per the example usage. My issue is that I don't actually have any async calls in the Console/Service project itself, but only in library code. As this is a Windows service, it simply sits around waiting for messages to be queued up. MassTransit code is what checks the queue and awaits the call to Task IConsumerFactory.Send(), which we implement. Does it make – Marc Chu Aug 03 '20 at 17:50
  • ...sense to call AsyncContext.Run() on the code that I await in this method? As it is library code, it would seem that this should be agnostic of things such as SynchronizationContext. Should I only be calling AsyncContext.Run() if SynchronizationContext.Current is null? It smells a little, but I struggle to think of how else I should be using this. – Marc Chu Aug 03 '20 at 17:54
  • @MarcChu: Calling `Run` once per message would work, although it would not be terribly efficient. I'd say go ahead with that pattern until you find it too inefficient. If the library is intended to be reusable, then you'd want to use `Run` even if `SynchronizationContext.Current` is not null, because there are `SynchronizationContext` implementations that do not guarantee resuming on the same thread (e.g., ASP.NET classic). – Stephen Cleary Aug 03 '20 at 21:16
  • I hate to reawaken this, but I wanted to put our system through its paces. I just converted another API to async, and now I'm getting the same problem as described above, but in our ASP MVC 5 website. I was under the impression that this was caused by the console application's null SynchronicationContext, and that it shouldn't happen in a website. But that seems not to be the case here. I'm currently doing a conversion to ASP.Net Core, and am wondering if I'll see this same behavior. It seems unwieldy to replace any awaited call to AsyncContext.Run(), so I'm hoping that's not my only choice. – Marc Chu Sep 15 '20 at 12:37
  • I should add that this most recent problem is in a completely separate context -- happening directly in a controller action -- and not related to the RabbitMQ consumer code/infrastructure above. It thus seems that I have to come up with a far more generalized solution to this problem. And I'm hoping it doesn't require complete re-working of our DAL. – Marc Chu Sep 15 '20 at 12:55
  • @MarcChu: From what I can tell, NHibernate 5.0 is async-safe, so it can support multiple threads over a session lifetime, as long as it's only one thread *at a time*. – Stephen Cleary Sep 16 '20 at 08:05
  • 1
    Yes, this is the conclusion that I've come to, also. The NH async API could never work otherwise. Which means the mere introduction of async-await requires a complete re-work of our DAL, which is thankfully pretty-well encapsulated. I appreciate all your assistance in this matter, your blog has been a huge help, as well. – Marc Chu Sep 16 '20 at 11:38