4

I have a classic asynchronous message dispatching problem. Essentially, I need to asynchronously dispatch messages and then capture the message response when the dispatch is complete. The problem is, I can't seem to figure out how to make any one request cycle self-expire and shortcircuit.

Here is a sample of the pattern I am using at the moment:

Defined delegate for invokation

private delegate IResponse MessageDispatchDelegate(IRequest request);

Dispatch messages with a callback

var dispatcher = new MessageDispatchDelegate(DispatchMessage);
dispatcher.BeginInvoke(requestMessage, DispatchMessageCallback, null);

Dispatch the message

private IResponse DispatchMessage(IRequest request)
{
  //Dispatch the message and throw exception if it times out
}

Get results of dispatch as either a response or an exception

private void DispatchMessageCallback(IAsyncResult ar)
{
  //Get result from EndInvoke(r) which could be IResponse or a Timeout Exception
}

What I can't figure out is how to cleanly implement the timeout/shortcircuit process in the DispatchMessage method. Any ideas would be appreciated

Ermias
  • 33
  • 6
JoeGeeky
  • 3,746
  • 6
  • 36
  • 53

2 Answers2

1
        var dispatcher = new MessageDispatchDelegate(DispatchMessage);

        var asyncResult = dispatcher.BeginInvoke(requestMessage, DispatchMessageCallback, null);
        if (!asyncResult.AsyncWaitHandle.WaitOne(1000, false))
        {
             /*Timeout action*/
        }
        else
        {
            response = dispatcher.EndInvoke(asyncResult);
        }
jvanrhyn
  • 2,804
  • 19
  • 14
  • Wouldn't this block calls that would otherwise begin the next parallel invokation of an overlapping dispatch and make this entire process syncronous? The goal would be to allow _n_-number of calls to dispatch messages in parallel. With that said, it is worth a test. – JoeGeeky Dec 03 '10 at 21:43
  • I made a test rig and tested the two patterns side-by-side. One with your suggestion and one without (e.g. no timeout support). My instincts were correct. This becomes a syncronous process which; as you might expect; takes a lot more time. As it turns out there are a number of other side-effects which have to be dealt with. To be performant the WaitHandle has to be closed explicitly and there are conditions when EndInvoke gets called twice which results in a costly exception (Ex. Called in the target callback method and in the 'else' block). I was able to see both these issues in my test rig. – JoeGeeky Dec 03 '10 at 22:33
  • 1
    @JoeGeeky: You could run several in parallel using `WaitHandle.WaitAny`. But using a new appdomain, as your answer does, is the only way to have any hope of aborting an operation that takes too long (and even with an appdomain, if the operation is running unmanaged code, abort may not be possible). – Ben Voigt Dec 28 '10 at 22:31
  • Thanks, I had not thought of Unmanaged implications, although I do not plan to use it in my case. – JoeGeeky Dec 28 '10 at 22:42
0

After lots of head-scratching I was finally able to find a solution for my original question. First off, let me say I got a lot of great responses and I tested all of them (commenting each with the results). The main problems were that all the proposed solutions led to either dead-locks (leading to 100% timeout scenario's) or made an otherwise Asyncronous process syncronous. I don't like answering my own question (first time ever), but in this case I took the advice of the StackOverflow FAQ since I've truely learned my own lesson and wanted to share it with the community.

In the end, I combined the proposed solutions with the invocation of delagates into alternate AppDomains. It's a bit more code and it's a little more expensive, but this avoids the dead-locks and allows fully asyncronous invocations which is what I required. Here are the bits...

First I needed something to invoke a delegate in another AppDomain

/// <summary>
/// Invokes actions in alternate AppDomains
/// </summary>
public static class DomainInvoker
{
    /// <summary>
    /// Invokes the supplied delegate in a new AppDomain and then unloads when it is complete
    /// </summary>
    public static T ExecuteInNewDomain<T>(Delegate delegateToInvoke, params object[] args)
    {
        AppDomain invocationDomain = AppDomain.CreateDomain("DomainInvoker_" + delegateToInvoke.GetHashCode());

        T returnValue = default(T);
        try
        {
            var context = new InvocationContext(delegateToInvoke, args);
            invocationDomain.DoCallBack(new CrossAppDomainDelegate(context.Invoke));

            returnValue = (T)invocationDomain.GetData("InvocationResult_" + invocationDomain.FriendlyName);
        }
        finally
        {
            AppDomain.Unload(invocationDomain);
        }
        return returnValue;
    }

    [Serializable]
    internal sealed class InvocationContext
    {
        private Delegate _delegateToInvoke;
        private object[] _arguments;

        public InvocationContext(Delegate delegateToInvoke, object[] args)
        {
            _delegateToInvoke = delegateToInvoke;
            _arguments = args;
        }

        public void Invoke()
        {
            if (_delegateToInvoke != null)
                AppDomain.CurrentDomain.SetData("InvocationResult_" + AppDomain.CurrentDomain.FriendlyName,
                    _delegateToInvoke.DynamicInvoke(_arguments));
        }
    }
}

Second I needed something to orchestrate collection of the required parameters and collect/resolve the results. This will also define the timeout and worker processes which will be called asyncronously in an alternate AppDomain

Note: In my tests, I extended the dispatch worker method to take random amounts of time to observe that everything worked as expected in both timeout and non-timeout cases

public delegate IResponse DispatchMessageWithTimeoutDelegate(IRequest request, int timeout = MessageDispatcher.DefaultTimeoutMs);

[Serializable]
public sealed class MessageDispatcher
{
    public const int DefaultTimeoutMs = 500;

    /// <summary>
    /// Public method called on one more many threads to send a request with a timeout
    /// </summary>
    public IResponse SendRequest(IRequest request, int timeout)
    {
        var dispatcher = new DispatchMessageWithTimeoutDelegate(SendRequestWithTimeout);
        return DomainInvoker.ExecuteInNewDomain<Response>(dispatcher, request, timeout);
    }

    /// <summary>
    /// Worker method invoked by the <see cref="DomainInvoker.ExecuteInNewDomain<>"/> process 
    /// </summary>
    private IResponse SendRequestWithTimeout(IRequest request, int timeout)
    {
        IResponse response = null;

        var dispatcher = new DispatchMessageDelegate(DispatchMessage);

        //Request Dispatch
        var asyncResult = dispatcher.BeginInvoke(request, null, null);

        //Wait for dispatch to complete or short-circuit if it takes too long
        if (!asyncResult.AsyncWaitHandle.WaitOne(timeout, false))
        {
            /* Timeout action */
            response = null;
        }
        else
        {
            /* Invoked call ended within the timeout period */
            response = dispatcher.EndInvoke(asyncResult);
        }

        return response;
    }

    /// <summary>
    /// Worker method to do the actual dispatch work while being monitored for timeout
    /// </summary>
    private IResponse DispatchMessage(IRequest request)
    {
        /* Do real dispatch work here */
        return new Response();
    }
}

Third I need something to stand-in for the actual thing that is asyncronously triggering the dispatches

Note: This is just to demonstrate the asyncronous behaviours I required. In reality, the First and Second items above demonstrate the isolation of timeout behaviours on alternate threads. This just demonstrates how the above resources are used

public delegate IResponse DispatchMessageDelegate(IRequest request);

class Program
{
    static int _responsesReceived;

    static void Main()
    {
        const int max = 500;

        for (int i = 0; i < max; i++)
        {
            SendRequest(new Request());
        }

        while (_responsesReceived < max)
        {
            Thread.Sleep(5);
        }
    }

    static void SendRequest(IRequest request, int timeout = MessageDispatcher.DefaultTimeoutMs)
    {
        var dispatcher = new DispatchMessageWithTimeoutDelegate(SendRequestWithTimeout);
        dispatcher.BeginInvoke(request, timeout, SendMessageCallback, request);
    }

    static IResponse SendRequestWithTimeout(IRequest request, int timeout = MessageDispatcher.DefaultTimeoutMs)
    {
        var dispatcher = new MessageDispatcher();
        return dispatcher.SendRequest(request, timeout);
    }

    static void SendMessageCallback(IAsyncResult ar)
    {
        var result = (AsyncResult)ar;
        var caller = (DispatchMessageWithTimeoutDelegate)result.AsyncDelegate;

        Response response;

        try
        {
            response = (Response)caller.EndInvoke(ar);
        }
        catch (Exception)
        {
            response = null;
        }

        Interlocked.Increment(ref _responsesReceived);
    }
}

In retrospect, this approach has some unintended consequences. Since the worker method occurs in an alternate AppDomain, this adds addition protections for exceptions (although it can also hide them), allows you to load and unload other managed assemblies (if required), and allows you to define highly constrained or specialized security contexts. This requires a bit more productionization but provided the framework to answer my original question. Hope this helps someone.

JoeGeeky
  • 3,746
  • 6
  • 36
  • 53