I'm using MassTransit (with RabbitMQ) for message queuing in a C# console application (actually hosted as a Windows Service, using TopShelf), in conjunction with NHibernate.
The flow our application consists of:
- Watching a file share for scanned documents (using a FileSystemWatcher)
- Doing some processing on the file (moving it to a new file location, inserting a record into our DB)
- Performing OCR on the document and reading it to see if it contains certain words
- Modify our database record to reflect the result of step 3.
From a high level technical implementation, the steps for this work by:
- Doing initial processing in the thread on which the FileSystemWatcher handles its Created event. Upon completion, publishing a message to our queue to perform the OCR and word check.
- MassTransit handles the message, creates a new lifetime scope, and instantiates a consumer to handle it
- The consumer performs the OCR with a call to an IOCRService implementation. When complete, and in this same consumer, we fetch the words (from the database) that we want to search for, then read the document text for to find those words.
- Post a response message back to MassTransit/RabbitMQ, and the consumer of this message modifies our database entry according to whether or not any words were found.
The problem I'm having is in step 3 of the above. This is approximately what our old code looks like:
public class Consumer
{
public Task Handle(message)
{
_Ocr.DoOcr(message); //Performed in-process
var response = DoDirtyWordCheck(message);
_Publisher.Publish(response);
return Task.CompletedTask;
}
private CheckResponse DoDirtyWordCheck(Message message)
{
var wordsToFind = _DB.FindWords();
var response = _Checker.SearchForWords(message);
}
}
The big change I'm making to this flow is that the OCR step is being taken out-of-process, and placed into a micro-service, and being invoked via a call to the micro-service using HttpClient. The new code is mostly the same:
public class Consumer
{
public async Task Handle(message)
{
await _Ocr.DoOcr(message); //calls out to micro-service and awaits the result; method on interface //changed from void return to Task return
var response = DoDirtyWordCheck(message);
_Publisher.Publish(response);
}
private CheckResponse DoDirtyWordCheck(Message message)
{
var wordsToFind = _DB.FindWords(); //Fails here
var response = _Checker.SearchForWords(message);
}
}
What I've been finding, however, is that this now often fails on the call to _DB.FindWords(). Well, as it turns out, this call is happening on a thread that is different from the thread on which my lifetime scope was started, which is the same thread as the call to await _OCR.DoOCR();
Why is that a problem? Because of the fact that NHibernate Sessions are not thread-safe, our DB layer (which is very complicated) ensures that operations can only be performed on the thread on which it was created.
Now, my former understanding of async/await was that no "trickiness" would be done with extra threads, such that I wouldn't have to worry about code necessarily needing to be thread-safe in order to await
it.
However, after doing a deep dive into async/await and coming to some understanding about how it works, it seems that some work actually is being done on a ThreadPool thread (whether this is the actual awaited work or the continuation after the await, I'm still not exactly sure), and this has something to do with the fact that in a console application, there is no SynchronizationContext
, which determines how this process is done; whereas, in a WPF application, work that is awaited on the UI thread will be necessarily continued on that same UI thread (which is the sort of behavior I was expecting to exhibited in all contexts).
So, this brings me to my ultimate question: How can I ensure that the code that needs to continue, after my call to await, continues on that same thread?
I understand that my above code flow could be restructured in ways that prevent this, e.g., I could break out the OCR operation and the dirty word check operation into two separate consumers so that each operation is in its own distinct context/lifetime scope, and probably any other number of things. However, any solution that requires this sort of restructuring is problematic to me, in that it seems to point to the fact that async/await is a leakier abstraction than it would seem to be at first glance.
It doesn't seem that this code, which could be library code that might be run under any context, should have to depend on anything having to do with threading models, ONLY due to the fact that one call in this chain is now awaited. Seems that it should "just work". Everything that has to do with lifetimes and scopes and threading for this use case, in my application, is expected to be atomic at the Consumer level, and which we expect to be taken care of (and has been) by the handling built into MassTransit, and our structuring of code around it.
Now, it seems to me that a possible solution exists in the SynchronizationContext
that I could be setting (or the TaskSheduler
?), which is what takes care of this sort of work in a WPF application, or a WinForms application, but which is not used at all (by default) in a console application. Unfortunately, it seems that it's no longer a common use case to be doing anything in a console application, let alone something that might pose this requirement, so I can't really find any pre-existing and well-tested solutions that do this sort of thing. Also, I don't yet know enough about the deep-down internals of the async-await implementation that I would feel comfortable hand-rolling my own solution to this, due to any unexpected implications.
So, can anyone help me out on this? Are there any problems with my underlying assumptions that are making me think about this in the wrong way? Or is there really a problem with the overall design/structure of the program itself?