I am running a code to receive messages from Azure Eventhub, similar to the official startup example. However, I am using Jupyter notebooks so I cannot use asyncio
to manage the tasks (if I understand correctly). I want to stop receiving messages when I cross a certain time threshold (e.g. 10 minutes before now) however the task continues running even if I raise an exception. The only way to stop it is by interrupting the kernel.
How to approach this? I am new to asynchronous programming so I might have misunderstood some concepts. Thank you in advance for any help.
A code example (raising exception that did not work):
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from datetime import datetime, timedelta
async def on_event(partition_context, event):
# Print the event data.
result_data = []
for event in event_batch:
result_data.append(event.body_as_json(encoding='UTF-8'))
max_session_time = max([datetime.utcfromtimestamp(event['session']['serverTime']/1000) for event in result_data])
# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)
if datetime.utcnow() - progress_dict[partition_context.partition_id] < timedelta(minutes = 10):
print('Calculation finished')
raise Exception('Calculation finished')
async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
try:
async with client:
# Call the receive method. Read from the beginning of the partition (starting_position: "-1")
await client.receive_batch(on_event_batch = on_event_batch, #on_error = on_error,
max_batch_size = 100, starting_position="-1") #
except Exception as e:
if e.args == ('Calculation finished',):
print(e)
pass
else:
raise e