0

Apologies if this question already has an answer, but if it does, I can't find it on this site.

Firstly - this question is specific to .NET core (v1.1.0 at time of writing)

I have a third party assembly which will only handle requests made to it if they originate on the same thread. This assembly is the RabbitMQ library and details of the problem which may or may not be relevant can be found here. Basically - I have multiple threads that could potentially call this assembly - but if requests originate from different threads - an exception is thrown by it.

So - to get round this, I am attempting to create a thread which is blocked so doesn't expire - and have to somehow have all calls to this assembly processed on this thread.

My first attempt was to create an event and subscribe to the event on the blocked thread. then any other thread would begin-invoke this event, which i thought may be picked up on the correct thread so that I could fulfil my wish of having the third party assembly requests handled on the single thread.

I now (painfully so) understand that it is not possible in .NET core to begin-invoke an event in .Net core :(

Example which demonstrates the problem:

public class Program
{
    public static void Main(string[] args)
    {
        Program program = new Program();
        ManualResetEvent resetEvent = new ManualResetEvent(false);
        program.StartNewThreadToBeCalled(resetEvent);

        program.CallBobMultipleTimes(resetEvent);

        Console.ReadLine();
    }

    private void CallBobMultipleTimes(ManualResetEvent resetEvent)
    {
         resetEvent.WaitOne();
         for(int i=0 ; i<100 ; i++)
            ThreadPool.QueueUserWorkItem(x=>CallBob(null, null)); //Can't BeginInvoke in .NET Core
    }

    private void StartNewThreadToBeCalled(ManualResetEvent resetEvent)
    {
        ThreadPool.QueueUserWorkItem(x=>
        {   
            Bob bob = new Bob();

            CallBob += (obj, e)=> bob.InvokeMeOnOneThreadOnly();
            resetEvent.Set();                
            ManualResetEvent mre = new ManualResetEvent(false);
            mre.WaitOne();  //Real implementation will block forever - this should be the only thread handles the processing of requests.
        });
    }

    public event EventHandler CallBob;
}

public class Bob
{
    private List<int> ThreadIdentifiers = new List<int>();
    private static object SyncObject = new object();


    public void InvokeMeOnOneThreadOnly()
    {   
        lock(SyncObject)
        {
            int currentThreadId = Thread.CurrentThread.ManagedThreadId;
            if(ThreadIdentifiers.Any())
            {
                if(!ThreadIdentifiers.Contains(currentThreadId))
                   Console.WriteLine("Don't call me from multiple threads!");
            }
            else
                ThreadIdentifiers.Add(currentThreadId);
        }
    }
}

I have got this working by creating a wrapper around a concurrentQueue which notifies whenever something is added to it. I then handle this event on my special 3rd party assembly thread and pick the requests off this queue until it is exhausted... not sure why this is working when the other way wouldn't?!

Is there a better way (than the one I have found) for me to handle multiple requests from multiple different threads on a single thread in .NET core?

Jay
  • 9,561
  • 7
  • 51
  • 72
  • Since queue is standard implementation of consumer-producers pattern it is not very surprising you've got it working... There is no way to know why "other way" does not work as there is no clear explanation of that "other way" ("by creating delegates" is not one). If you need an answer you need to clarify what you've tried for "other way" or explain in what way you want to improve queue solution - in current state post feels too broad. – Alexei Levenkov Feb 22 '17 at 17:00
  • @Alexei I was about to walk out of work when I posted - so didnt have time to get more a more detailed post, I'll try to expand with a code example when I get some free time later. – Jay Feb 22 '17 at 17:35
  • @AlexeiLevenkov I've updated with a code snippet which explains the problem I am having. Forgetting anything else I have tried, now the question states my original problem, a repo which boils down the code I have pertaining to the the original problem and explains what I have done so it works. My question is really - what is the correct way to make this work? What I have done with the concurrentQueue feels hacky. – Jay Feb 22 '17 at 18:26
  • Sample code is very strange (infinite wait makes no sense at all for example and naming repeated calls recurrent?)... So far it sounds you just need standard consumer-producers implementation with consumer being single thread - there plenty of existing posts (i.e. http://stackoverflow.com/questions/1656404/c-sharp-producer-consumer) and you seem to already have some reasonable implementation with concurrent queue (you may want to post to [codereview.se] to review working code)... so to unless you have strong reasons to change I'd recommend sticking with queue. – Alexei Levenkov Feb 22 '17 at 18:57
  • @AlexeiLevenkov It was a quick example that was knocked up in 10 minutes designed to illustrate the problem. This is quite obviously not production code, but just an attempt to show the problem without the extra bulk of a real implementation. I'll fix the naming error you pointed out for posterity. Thanks for the pointer to the consumer-producer pattern as I was unaware of it - I think this is actually what I wanted (or maybe validation that I was on the right track) but I need to understand it's implementation a bit more as I have indicated to the poster of the first answer to this question. – Jay Feb 22 '17 at 19:11
  • [Async Producer/Consumer Queue using Dataflow](http://blog.stephencleary.com/2012/11/async-producerconsumer-queue-using.html) and [How to: Implement a Producer-Consumer Dataflow Pattern](https://msdn.microsoft.com/en-us/library/hh228601.aspx) articles could also be useful. – VMAtm Feb 22 '17 at 23:59

2 Answers2

2

It's kinda hard to tell what you're trying to do from the code. But, another way would be to use BlockingCollection<T>. The background thread could add values to that collection and a single thread could sit and try to get values out of the collection, for example:

while(continueProcessingCollection)
{
    if(blockingCollection.TryTake(out value, TimeSpan.FromSeconds(myTimeout)))
    {
        // process value, decide to toggle continueProcessingCollection
    }
}

The background thread could add new values to the blocking collection:

blockingCollection.Add(newValue);

And, somewhere you need to create the blockingCollection variable:

var blockingCollection = new BlockingCollection<MyType>();
Peter Ritchie
  • 35,463
  • 9
  • 80
  • 98
  • I've just looked this up on MSDN, so my understanding (now) is that if you invoke TryTake(), it will block until something is added to the collection? Then when something is added, it will unblock and the code will carry on? – Jay Feb 22 '17 at 19:32
  • @jay it will return true if there was something in the collection, or false if it timed-out. You need to know what criteria to stop the loop (a timeout, number of things processed, etc.) But, it will only block as long as your timeout (I used 1 second for my example). – Peter Ritchie Feb 22 '17 at 19:37
  • Then this is exactly what I need - much better than my hacky wrapped concurrent queue. Thanks! – Jay Feb 22 '17 at 19:40
  • Should be a lot less code :) – Peter Ritchie Feb 22 '17 at 20:05
1

The delegate BeginInvoke uses threadpool to schedule the callback which will not solve your problem. This is basically a Producer–consumer problem with multiple producers and one consumer. .NET core includes some structures implement IProducerConsumerCollection<T> you can use to solve this problem.

Don't be confused with the Control.BeginInvoke(winform) and Dispatcher.BeginInvoke(WPF), which queue the callback on the GUI thread.

xwlantian
  • 31
  • 2
  • I will have a look at this pattern as I was unaware that there is a .net core interface for this. Thanks, I will let you know how I get on – Jay Feb 22 '17 at 18:51