1

I am trying to consume IoTHub messages with EventHubConsumerClient in python. (I also tried the csharp EventProcessorClient, it seems to have the same problem)

I am running the EventHubConsumerClient in a VM.

The setup:

  • IoT Hub accessible from the internet.
  • User assigned identity, added to VM.
  • User assigned identity has role "IoT Hub Data Reader" with scope set to the IoT Hub
  • BlobCheckpointStore connected to a blob storage, authenticated with the managed identity (I checked, it works)
  • Region West Europe

Everything works fine if I use the event hub connection string to connect to the IoT Hub. But when I use managed identity credential, I get the following error:

evtconsumer    | INFO:uamqp.c_uamqp:Token put complete with result: 3, status: 401, description: b'InvalidIssuer: Token issuer is invalid. TrackingId:0315ff67-60c5-4bb2-ba6d-160f45eb91eb, SystemTracker:NoSystemTracker, Timestamp:2021-09-15T10:30:05', connection: b'a44766f1-5d50-4d61-958e-52f7529315a4'
evtconsumer    | INFO:uamqp.authentication.cbs_auth:Authentication status: 401, description: b'InvalidIssuer: Token issuer is invalid. TrackingId:0315ff67-60c5-4bb2-ba6d-160f45eb91eb, SystemTracker:NoSystemTracker, Timestamp:2021-09-15T10:30:05'
evtconsumer    | INFO:uamqp.authentication.cbs_auth:Authentication Put-Token failed. Retrying.

My code:

import logging
import os
import sys
from azure.identity import ManagedIdentityCredential
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

logging.basicConfig(stream = sys.stdout, level = logging.INFO)
_logger = logging.getLogger()

azure_client_id = os.getenv("AZURE_CLIENT_ID")
evthubnamespace = os.getenv("IOTHUB_EVTHUB_FULLY_QUALIFIED_NAMESPACE")
evthubname = os.getenv("IOTHUB_EVTHUB_NAME")
evthubconnectionstring = os.getenv("IOTHUB_EVTHUB_CONNECTION_STRING")
blob_account_url = os.getenv("BLOB_ACCOUNT_URL")
blob_container_name = os.getenv("BLOB_CONTAINER_NAME")

# for toggling between authentication methods:
use_connection_string = os.getenv("USE_CONNECTION_STRING") == "true"

credential = ManagedIdentityCredential(client_id=azure_client_id)

def on_event(partition_context, event):
    # Print the event data.
    _logger.info("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    partition_context.update_checkpoint(event)

def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore(
        credential=credential,
        blob_account_url=blob_account_url,
        container_name=blob_container_name)

    if use_connection_string:
        # this works fine
        _logger.info("Using connection string")
        client = EventHubConsumerClient.from_connection_string(
            evthubconnectionstring,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store)
    else:
        # This causes errors
        _logger.info(f"Using managed identity. fully_qualified_namespace: {evthubnamespace} eventhub_name: {evthubname}")
        client = EventHubConsumerClient(
            fully_qualified_namespace=evthubnamespace,
            eventhub_name=evthubname,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential)
    with client:
        # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
        client.receive(on_event=on_event)

if __name__ == '__main__':
    main()

I am all out of ideas with this one. It seems the AMQP event hub interface of the IoT hub does not accept the tokens generated from the managed identity credential?

bad_coder
  • 11,289
  • 20
  • 44
  • 72
Wooyay
  • 21
  • 2
  • 1
    Were you able to figure this out? I'm experiencing the same issue. – 에이바바 May 10 '22 at 01:57
  • No. We moved awaily from iot hub towards doing everything ourselves in a vm. Far simpler and better to calculate. There were other issues that put us off using azure iot stuff. – Wooyay May 11 '22 at 05:27

0 Answers0