2

I have some experimental code basically just trying to make a simple scenario work. I have one client that is streaming data to multiple services. The problem I have is that if one of the services does not shutdown gracefully I get an EndpointNotFoundException which I can't seem to handle. Below is my attempt at handling this which is failing. In reality I would like to remove the failed service channel from the list of channels and continue on streaming data to the services which are still up and running. The timer stuff simply gives the services a chance to start up before the data streaming starts.

let prices = returns a seq of data that is streamed.

type ReplayDataStream(prices) =
  let evt = new Event<_>()
  member x.Replay() = 
                    async { for line, delay in prices do
                                do! Async.Sleep(delay)
                                evt.Trigger(line) }
                                |> Async.StartImmediate

  member x.PriceChanged = evt.Publish


let main() =
    let addresses = new ResizeArray<EndpointAddress>()

    let announcementService = new AnnouncementService()

    let createChannels addresses =
        let channels = new ResizeArray<IInputDataService>()
        for (address:EndpointAddress) in addresses do
                let channelFactory = new ChannelFactory<IInputDataService>(new BasicHttpBinding(), address)
                let channel = channelFactory.CreateChannel()
                (channel :?> ICommunicationObject).Faulted.Add(fun x -> 
                                                                        (channel :?> ICommunicationObject).Abort()
                                                                        channels.Remove(channel) |> ignore
                                                               )
                channels.Add(channel)
        channels

    let sendMessage(args:ElapsedEventArgs) =
        let channels = createChannels addresses
        for financialDataStream in prices do
        let replayDataStreamA = new ReplayDataStream(financialDataStream)
        for channel in channels do
            try
            //This is where it blows up and the try block isn't catching the exception.
            replayDataStreamA.PriceChanged.Add(channel.InputStringData)
            with
            | :? EndpointNotFoundException as ex -> Console.WriteLine(ex.ToString())
            | :? CommunicationException as ex -> Console.WriteLine(ex.ToString())
            | :? Exception as ex -> Console.WriteLine(ex.ToString())
            replayDataStreamA.Replay()

    let timer = new System.Timers.Timer()
    timer.Enabled <- true
    timer.AutoReset <- false
    timer.Interval <- 30000.0
    timer.Start()
    timer.Elapsed.Add(sendMessage)

    announcementService.OnlineAnnouncementReceived.Add(fun e -> 
                                                                Console.WriteLine(e.EndpointDiscoveryMetadata.Address)
                                                                addresses.Add(e.EndpointDiscoveryMetadata.Address)
                                                                )

    announcementService.OfflineAnnouncementReceived.Add(fun e -> 
                                                                Console.WriteLine(e.EndpointDiscoveryMetadata.Address)
                                                                addresses.Remove(e.EndpointDiscoveryMetadata.Address) |> ignore
                                                                )

    let announcementServiceHost = new ServiceHost(announcementService)
    try
        announcementServiceHost.AddServiceEndpoint(new UdpAnnouncementEndpoint());
        announcementServiceHost.Open();
    with 
    | :? System.ServiceModel.CommunicationException as ex -> Console.WriteLine(ex.ToString())
    | :? System.TimeoutException as ex -> Console.WriteLine(ex.ToString())


    printfn "%s" "Hit any key to close."
    Console.ReadKey() |> ignore
Beaker
  • 2,804
  • 5
  • 33
  • 54
  • http://stackoverflow.com/questions/2458631/wcf-c-unable-to-catch-endpointnotfoundexception/2672872#2672872 – GregC May 27 '11 at 03:50
  • That's the solution I tried to implement in my code above. I suspect I may not be properly translating it to F#. – Beaker May 27 '11 at 04:03

2 Answers2

2

After rewriting my code in C# it finally dawned on me what I was doing wrong. This is what the PriceChanged event handler should look like. I needed to catch the exception inside the lambda itself. Now I need to write something that actually looks like production code. :)

replayDataStreamA.PriceChanged.Add( fun x -> 
                                                            try
                                                            channel.InputStringData x
                                                            with 
                                                            | :? System.ServiceModel.CommunicationException as ex -> (channel :?> ICommunicationObject).Abort()
                                                            )

For posterity here is the entire method:

let sendMessage(args:ElapsedEventArgs) =
            if(addresses.Count > 0) then
                for address in addresses do
                    let channelFactory = new ChannelFactory<IInputDataService>(new BasicHttpBinding(), address)
                    let channel = channelFactory.CreateChannel()
                    for financialDataStream in prices do
                    let replayDataStreamA = new ReplayDataStream(financialDataStream)
                    replayDataStreamA.PriceChanged.Add( fun x -> 
                                                        try
                                                        channel.InputStringData x
                                                        with 
                                                        | :? System.ServiceModel.CommunicationException as ex -> (channel :?> ICommunicationObject).Abort()
                                                        )
                    replayDataStreamA.Replay()
Beaker
  • 2,804
  • 5
  • 33
  • 54
0

Explanation by Sky Sanders makes a lot of sense, and should work for this scenario. Here's a link to the blog.

Providing a subscriber to the Faulted event does not do exactly the same thing as calling channel.Abort() inside an exception handler.

PriceChanged.Add() is the equivalent of PriceChanged += : you're subscribing a handler to the Price changed event. Placing a try/with block will catch exceptions thrown while subscribing (think custom Add/Remove implementation in your event), and that's not what you want. You're looking for a way to handle exception when calling InputStringData. This thought process naturally leads to your solution.

In C# production code, place a try/catch block around a point where the exception is raised event-side. Catch the exception thrown by subscriber and Debug.Assert with a rethrow, warning developer that all exceptions should be handled subscriber-side. In your code, this means a try/with block that warns and re-throws at evt.Trigger().

You could expose the async block instead of running it at the point of declaration. This should provide you with orchestration powers at a higher level: within sendMessage. There's a special API to catch exceptions, handle cancellation and timeouts in one central place that's really worth looking into.

Community
  • 1
  • 1
GregC
  • 7,737
  • 2
  • 53
  • 67
  • That's basically the solution I tried to implement in my code above. Perhaps my translation of it to F# is incorrect somehow? – Beaker May 27 '11 at 03:57
  • The line that throws is commented above it stating "this is where it blows up". – Beaker May 27 '11 at 04:13
  • The code works, it simply doesn't recover if a single service shuts down in an ungraceful manner. Maybe you could paste it in an fsi session and see if something jumps out at you. – Beaker May 27 '11 at 04:17
  • I tried both of the line offsets you suggested, neither has any impact. – Beaker May 27 '11 at 04:58
  • I also think that providing a subscriber to the Faulted event does not do exactly the same thing as calling channel.Abort() inside an exception handler. – GregC May 27 '11 at 13:01
  • I'm rewriting this in C# to see if the problem becomes clear in a language I understand much better. – Beaker May 27 '11 at 17:51