0

I am running a code to receive messages from Azure Eventhub, similar to the official startup example. However, I am using Jupyter notebooks so I cannot use asyncio to manage the tasks (if I understand correctly). I want to stop receiving messages when I cross a certain time threshold (e.g. 10 minutes before now) however the task continues running even if I raise an exception. The only way to stop it is by interrupting the kernel.

How to approach this? I am new to asynchronous programming so I might have misunderstood some concepts. Thank you in advance for any help.

A code example (raising exception that did not work):

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from datetime import datetime, timedelta

async def on_event(partition_context, event):
    # Print the event data.
    result_data = []
    for event in event_batch:
        result_data.append(event.body_as_json(encoding='UTF-8'))
    
    max_session_time = max([datetime.utcfromtimestamp(event['session']['serverTime']/1000) for event in result_data])

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    await partition_context.update_checkpoint(event)

    if datetime.utcnow() - progress_dict[partition_context.partition_id] < timedelta(minutes = 10):
        print('Calculation finished')
        raise Exception('Calculation finished')

async def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
    try:
        async with client:
            # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
            await client.receive_batch(on_event_batch = on_event_batch, #on_error = on_error, 
                                       max_batch_size = 100, starting_position="-1") # 
    except Exception as e:
        if e.args == ('Calculation finished',):
            print(e)
            pass
        else:
            raise e
Veronika Vrana
  • 140
  • 1
  • 11

0 Answers0