2

Following the EventProcessorHost example we implemented our custom logic in onEvents(). Some data is not being processed and I suspect this is because of the warnings thrown by the Java client.

In the log we see StorageException (time-out on blob storage for renewing leases or checkpointing), LeaseLostException (probably due to the previous exception) and EventHubException (when event hub move or go offline for a short period).

Basically my question is: How do these exceptions impact the processing of events and how can we make sure no events are skipped (e.g. via exception handling with retry and completely shutdown as last resort)?

I read through the docs and searched through other questions unable to find a satisfactory answer (this and this one provide some insight).

Our code:

public class EventProcessor implements IEventProcessor {
    ...
    @Override
    public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception {
        for (EventData event : events) {
            try {
                String message = new String(event.getBytes(), StandardCharsets.UTF_8);

                mystuff.process(message);

                this.checkpointBatchingCount++;
                if ((checkpointBatchingCount % 50) == 0) {
                    context.checkpoint(data).get();
                }
            } catch (Exception e) {
                LOG.warn("Processing event failed: {}", e.getMessage())
            }
        }
    }
    ...
}
Roy van der Valk
  • 527
  • 1
  • 6
  • 18
  • I've been searching around for your exact issue, and was wondering if you ended up with a solution to this or not. I looked into the PartitionReceiveHandler that's part of the eventhubs package (com.microsoft.azure.eventhubs.*) and it contained an onReceive in which I'm unsure if it solves the issue at hand. It seems to be that IEventProcessor doesn't work as a listener; upon calling registerEventProcessor, only events up to that point in time are fetched in the desired onEvents() method; then it hangs with no error or exit, and to get further messages from the desired EventHub we'd have to re – Cherisle Jun 12 '19 at 17:57
  • Honestly I have been pretty disappointed in the Azure Event Hub Java client, we ran into problem after problem (check out my issues on their Github repo). I am glad the company I worked for at the time moved away from Azure Event Hub completely and migrated to another streaming platform at a later stage. – Roy van der Valk Jun 13 '19 at 15:25

1 Answers1

1

From my understanding of the EventProcessor, you will be reprocessing events rather than missing events. There may be another underlying issue.

What happens when you call checkpoint is that it persists the sequence number (offset, etc) stream for that EventData saying that "I've processed this."

When you get a StorageException, it means that the sequence number was not successfully persisted, so the sequence number for an older event lives on in your blob storage. In the case that you encounter an EventHubException where the processor is disconnected when it restarts, it'll try to claim whatever leases have expired and start processing from the last checkpoint that was successful.

The LeaseLostException you'll get if another event processor "stole" the partition you were currently processing. This happens when there are multiple instances of the EventProcessor running and the client tries to balance the number of partitions between the running instances.

Connie Yau
  • 670
  • 5
  • 12