38

The application uses .NET 4.6.1 and the Microsoft.Azure.ServiceBus.EventProcessorHost nuget package v2.0.2, along with it's dependency WindowsAzure.ServiceBus package v3.0.1 to process Azure Event Hub messages.

The application has an implementation of IEventProcessor. When an unhandled exception is thrown from the ProcessEventsAsync method the EventProcessorHost never re-sends those messages to the running instance of IEventProcessor. (Anecdotally, it will re-send if the hosting application is stopped and restarted or if the lease is lost and re-obtained.)

Is there a way to force the event message that resulted in an exception to be re-sent by EventProcessorHost to the IEventProcessor implementation?

One possible solution is presented in this comment on a nearly identical question: Redeliver unprocessed EventHub messages in IEventProcessor.ProcessEventsAsync

The comment suggests holding a copy of the last successfully processed event message and checkpointing explicitly using that message when an exception occurs in ProcessEventsAsync. However, after implementing and testing such a solution, the EventProcessorHost still does not re-send. The implementation is pretty simple:

private EventData _lastSuccessfulEvent;

public async Task ProcessEventsAsync(
    PartitionContext context,
    IEnumerable<EventData> messages)
{
    try
    {
        await ProcessEvents(context, messages);     // does actual processing, may throw exception
        _lastSuccessfulEvent = messages
            .OrderByDescending(ed => ed.SequenceNumber)
            .First();
    }
    catch(Exception ex)
    {
        await context.CheckpointAsync(_lastSuccessfulEvent);
    }
}

An analysis of things in action: enter image description here

A partial log sample is available here: https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt

Community
  • 1
  • 1
Chrisgh
  • 1,016
  • 2
  • 10
  • 18
  • Chrisgh: Unlike Service Bus where messages can be leased individually; with Event Hubs, checkpointing event X means that all messages up to and including X (x-1, x-2, etc.) as assumed to have been processed successfully. Meaning that, in a parallel environment; you may fail to process message X-1 but successfully process message X, checkpoint, and never attempt to reprocess message X-1. Is it possible that this scenario could be the root of the behavior you're seeing? – MSC Dec 07 '16 at 13:33
  • No, checkpoints, ie PartitionContext.CheckpointAsync(), only occur when all messages received by the IEventProcessor implementation are successfully processed. The IEventProcessor implementation never checkpoints if there is an unhandled exception. But if there is an unhandled exception the EventProcessorHost is never re-sending the "uncheckpointed" messages back to IEventProcessor.ProcessEventsAsync. – Chrisgh Dec 07 '16 at 15:08
  • We've attempted to work around this by "re-checkpointing" X-1, i.e, the last successfully processed event message, via PartitionContext.CheckpointAsync(_lastSuccessfulEvent), but EventProcessorHost still does not re-send. For clarity, this is tested with a single message on the partition at a time.. – Chrisgh Dec 07 '16 at 15:08
  • Your test clearly shows this is not the case here, but one point to be careful of is that the _lastSuccessfulEvent probably needs to be partition-specific. I.e, you would keep an ConcurrentDictionary where the Guid was the partitioned and the value was the eventdata. – RMD Dec 08 '16 at 14:21
  • Agreed, but this was a proof-of-concept to validate whether retrial would occur by "re-checkpointing". (It also should probably be ordering on the Offset in addition to the Sequence Number.) – Chrisgh Dec 08 '16 at 17:52
  • I didn't dig deeper. but lil suggestion. How about Sending another Event by cloning the failed EventData from the failed event. – Kris Jan 03 '17 at 23:42
  • I believe that would result in duplicates for other consumers. Yes, those could be handled, but given this appears to be an issue with the host app and not the event hub itself, that's seems like a janky work around. – RMD Jan 04 '17 at 04:40
  • @RMD have you seen these? http://www.hivmr.com/db/sxa1a11dckmjjzpkd193d3x9zdsxdkz3 and http://www.hivmr.com/db/cc1mzcds8a88dpx1x1s3fcpakd193mzp – t1f Jan 06 '17 at 20:45

2 Answers2

18

TLDR: The only reliable way to re-play a failed batch of events to the IEventProcessor.ProcessEventsAsync is to - Shutdown the EventProcessorHost(aka EPH) immediately - either by using eph.UnregisterEventProcessorAsync() or by terminating the process - based on the situation. This will let other EPH instances to acquire the lease for this partition & start from the previous checkpoint.

Before explaining this - I want to call-out that, this is a great Question & indeed, was one of the toughest design choices we had to make for EPH. In my view, it was a trade-off b/w: usability/supportability of the EPH framework, vs Technical-Correctness.

Ideal Situation would have been: When the user-code in IEventProcessorImpl.ProcessEventsAsync throws an Exception - EPH library shouldn't catch this. It should have let this Exception - crash the process & the crash-dump clearly shows the callstack responsible. I still believe - this is the most technically-correct solution.

Current situation: The contract of IEventProcessorImpl.ProcessEventsAsync API & EPH is,

  1. as long as EventData can be received from EventHubs service - continue invoking the user-callback (IEventProcessorImplementation.ProcessEventsAsync) with the EventData's & if the user-callback throws errors while invoking, notify EventProcessorOptions.ExceptionReceived.
  2. User-code inside IEventProcessorImpl.ProcessEventsAsync should handle all errors and incorporate Retry's as necessary. EPH doesn't set any timeout on this call-back to give users full control over processing-time.
  3. If a specific event is the cause of trouble - mark the EventData with a special property - for ex:type=poison-event and re-send to the same EventHub(include a pointer to the actual event, copy these EventData.Offset and SequenceNumber into the New EventData.ApplicationProperties) or fwd it to a SERVICEBUS Queue or store it elsewhere, basically, identify & defer processing the poison-event.
  4. if you handled all possible cases and are still running into Exceptions - catch'em & shutdown EPH or failfast the process with this exception. When the EPH comes back up - it will start from where-it-left.

Why does check-pointing 'the old event' NOT work (read this to understand EPH in general):

Behind the scenes, EPH is running a pump per EventHub Consumergroup partition's receiver - whose job is to start the receiver from a given checkpoint (if present) and create a dedicated instance of IEventProcessor implementation and then receive from the designated EventHub partition from the specified Offset in the checkpoint (if not present - EventProcessorOptions.initialOffsetProvider) and eventually invoke IEventProcessorImpl.ProcessEventsAsync. The purpose of the Checkpoint is to be able to reliably start processing messages, when the EPH process Shutsdown and the ownership of Partition is moved to another EPH instances. So, checkpoint will be consumed only while starting the PUMP and will NOT be read, once the pump started.

As I am writing this, EPH is at version 2.2.10.

more general reading on Event Hubs...

Sreeram Garlapati
  • 4,877
  • 17
  • 33
  • 2
    The shutdown via UnregisterEventProcessorAsync was something that was considered and is the approach I'm leaning towards. Thanks for your detailed response, which validates that approach. – Chrisgh Jan 30 '17 at 16:29
  • Should we also retry when internal exceptions (like LeaseLostException, StorageException or EventHubException) are thrown? I dedicated a separate question to this and exception handling in general: https://stackoverflow.com/questions/50953546/ – Roy van der Valk Jun 20 '18 at 17:19
-2

Simple Answer: Have you tried EventProcessorHost.ResetConnection(string partiotionId)?

Complex Answer: It might be an architecture problem that needs to addressed at your end, why did the processing fail? was it a transient error? is retrying the processing logic is a possible scenario? And so on...

Itay Podhajcer
  • 2,616
  • 2
  • 9
  • 14
  • Processing can fail for a variety of reasons but that shouldn't force you to "lose" events. Indeed, event hubs as a whole don't ever record the fact you processed an event. You tell them where you want to start reading. They'll delete events after the retention period, but that's about it. If this is a problem, it's a problem caused by assumptions or bugs made by MS when they wrote the EventProcessorHost and not assumptions made when choosing event hubs in general. – RMD Dec 22 '16 at 19:09