23

I want to ensure that, if my eventhub client crashes (currently a console application), it only picks up events it has not yet taken from the eventhub. One way to achieve this, is to exploit offsets. However, this (to my understanding) requires the client to store the latest offset (besides events do not necessarily seem to hit the foreach loop of the ProcessEventsAsync method ordered by SequenceNumber).

An alternative, is to use checkpoints. I think they are persisted via the server (eventhub) using the provided storage account credentials. Is this correct?

This is some preliminary code I am currently using:

public class SimpleEventProcessor : IEventProcessor
{
    private Stopwatch _checkpointStopWatch;

    async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
    {
        Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    Task IEventProcessor.OpenAsync(PartitionContext context)
    {
        Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
        _checkpointStopWatch = new Stopwatch();
        _checkpointStopWatch.Start();
        return Task.FromResult<object>(null);
    }

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (var eventData in messages)
        {
            // do something                    
        }

        //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
        if (_checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
        {
            await context.CheckpointAsync();
            _checkpointStopWatch.Restart();
        }
    }
}

I believe it sends creates a checkpoint to the server every 5 minutes. How does the server know, which client has submitted the checkpoint (via the context)? Also, how can I prevent events from processed again if the client restarts? Furthermore, there could still be an up to 5 minutes window in which events are processed again. Perhaps I should rather use a queue/topic given my requirement?

PS:

This seems to be sufficient:

async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
    foreach (var eventData in messages)
    {
        // do something
    }
    await context.CheckpointAsync();
}
kdazzle
  • 4,190
  • 3
  • 24
  • 27
cs0815
  • 16,751
  • 45
  • 136
  • 299
  • In your code, we cannot guarantee every 5 minutes checkpoints are updated for all partitions? Is there any better to perform checkpoint for all partitions every 5 minutes? – Nikesh Devaki Apr 02 '19 at 10:17

1 Answers1

44

Lemme put forward a few basic terminology before answering:

EventHubs is high-thruput durable event ingestion pipeline. Simply put - its a reliable stream of events on Cloud.

Offset on EventData (one Event in the stream) is literally a Cursor on the Stream. Having this Cursor - will enable operations like - restart reading from this cursor (aka Offset) - inclusive or exclusive.

EventProcessor library is a framework that EventHubs team built, on-Top-of ServiceBus SDK to make "eventhub receiver gu" - look easier. ZooKeeper for Kafka <-> EPH for Event Hub. It will make sure when the process running EventProcessor on a specific partition dies/crashes - it will be resumed from last Checkpointed offset - in other available EventProcessorHost instance.

CheckPoint : as of today - EventHubs only supports client-side check-pointing. When you call Checkpoint from your Client-code:

await context.CheckpointAsync();

- it will translate to a Storage call (directly from Client) - which will store the current offset in the storage account you provided. EventHubs Service will not talk to Storage for Check-pointing.

THE ANSWER

EventProcessor framework is meant to achieve exactly what you are looking for.

Checkpoints are not persisted via Server (aka EVENTHUBS Service). Its purely client-side. You are talking to Azure storage. That's the reason EventProcessor library brings in a new additional dependency - AzureStorageClient. You can connect to the storage account & the container to which the checkpoints are written to - we maintain the ownership information - EPH instances (name) to Partitions of EventHubs they own and at what checkpoint they currently read/processed until.

As per the timer based checkpoint'ing pattern - you originally had - if the Process goes down - you will re-do the events in last 5 minute window. This is a healthy pattern as:

  1. fundamental assumption is that Faults are rare events - so you will deal with duplicate events rarely
  2. you will end-up make less calls to Storage service (which you could easily overwhelm by check-pointing frequently). I would go one step further and actually, would fire checkpoint call asynchronously. OnProcessEvents need not fail if checkpoint fails!

if you want absolutely no-events to repeat - you will need to build this de-duplication logic in the down-stream pipeline.

  • every time the EventProcessorImpl starts - query your downstream for the last sequence no. it got and keep discarding events until the current sequence no.

here's more general reading on Event Hubs...

Sreeram Garlapati
  • 4,877
  • 17
  • 33
  • Thanks for you lengthy reply. Yes checkpoint are persisted 'server side/in the cloud'. Namely in the azure storage account linked to the eventhub. This is in contrast to offsets which have to be persisted client side. Can you please provide some more 'transactional code', which also ensures that the events are passed on to the target event sink in the correct order? Thanks! – cs0815 Feb 18 '16 at 06:53
  • Thanks! What do you mean by 'storage account linked to eventhub'? There is no such a thing. The storage account you provide to EventProcessorHost is purely Client-side. As I said, the API context.CheckpointAsync() directly calls Azure Storage and Checkpoints current Offset. I am guessing you write to 'target event sink' in processEventsAsync - transactionally doing this - is only possible - if you are using the Same AzureTable as a Sink and the one used for checkpointing offset's. By design, EventProcessor is more suited for ReceiveApplication which is tolerant to duplicate events. – Sreeram Garlapati Feb 18 '16 at 07:34
  • 3
    In short - just to be clear, EventHubs Service - is completely unaware that you are checkpointing to Azure Storage. EventProcessor library - Only helps the job of checkpointing the Offset (and managing lease across multiple instances) using the Azure Storage library. – Sreeram Garlapati Feb 18 '16 at 07:41
  • Ok thanks. Would you mind providing a 'transactional code snippet' as mentioned above? Thanks! – cs0815 Feb 18 '16 at 08:07
  • Sreeram, Continous checkpointing to azure storage account can cost more money!. Apart from increasing delay between consecutive checkpointing transaction, are there any better economical ways? can we maintain checkpoints on local machine? – Nikesh Devaki Apr 01 '19 at 12:30
  • Do you see any challenges in implementing local checkpoint manager? – Nikesh Devaki Apr 01 '19 at 13:37