0

I have just started learning Masstransit and I would like to ask you one query on the accepted answer in this post. Here I have the same problem where consumer does not get the message. May be I have made some mistake or misunderstood some basic concept.

What I did:

  • Create a Topic in Azure service bus

  • Create a Subscription for the Topic

  • Send message into the topic as per the suggestion here. I can see from Azure portal that the message has been added into the Topic successfully.

  • Now I'm trying to consume the message by using the below code:

        private static async Task SetupMassTransit()
      {
          var services = new ServiceCollection();
          services.AddMassTransit(x =>
          {
              x.AddConsumer<MessageConsumer>(); //Consumer class
              x.UsingAzureServiceBus((context, cfg) =>
              {
                  cfg.Host(ServiceBusConnectionString);
                  // https://stackoverflow.com/questions/64229653/receiver-not-picking-up-message-with-mass-transit-subscription
                  cfg.SubscriptionEndpoint(TopicName, SubscriptionName, e =>
                  {
                      e.PrefetchCount = 100;
                      e.MaxConcurrentCalls = 100;
                      e.LockDuration = TimeSpan.FromMinutes(5);
                      e.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
    
                      e.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));
                      e.UseInMemoryOutbox();
    
                      e.ConfigureConsumer<MessageConsumer>(context);
                  });
              });
          });
              var provider = services.BuildServiceProvider();
              IBusControl busControl = provider.GetRequiredService<IBusControl>();
              busControl.Start();
      }
    
  • I am expecting that the 'Consume' function should be called when I run the consumer project. But it does not. Also it creates fault topic though I'm unable to read it through Fault consumer.

      public class MessageConsumer : IConsumer<SendMessage>
      { 
      public Task Consume(ConsumeContext<SendMessage> context)
      {
          Console.WriteLine($"Message processed: SequenceNumber:{context.Message.customerId} - Text:{context.Message.messages}");
          return Task.CompletedTask;
      }
      }
    

Please note: The consumer and Producer use the same message object (i.e. inside same namespace)

Could you please tell me where actually I make the mistake? In advance thanks

Molay
  • 39
  • 7
  • One interesting thing , I have noticed that SubscriptionEndpoint function create new topic and subscription if a new topic and subscription name has been send. – Molay Apr 05 '21 at 21:10
  • What is TopicName? When you publish the message, does it go to that same topic? By default, topics are named based upon the message type. If you change that (as pointed out in the question linked), does it match TopicName? – Chris Patterson Apr 06 '21 at 00:02
  • TopicName is a Const AND Publisher and Consumer use the same TopicName. – Molay Apr 06 '21 at 07:36

2 Answers2

1

Thank you Daniel. DI might be the issue, though I have noticed that it's working now without DI (I will try to find out the reason why it was not working first time) One thing, As mention here

cfg.SubscriptionEndpoint("your-topic-name", "your-subscription-name", e =>

I think it should be

cfg.SubscriptionEndpoint("your-subscription-name", "your-topic-name", e =>

Final working source code:

        private static async Task SetupMassTransit(IServiceCollection services)
    {
        try
        {
            //services.AddScoped<IConsumer<SendMessage>, MessageConsumer>(); It's still working without this line
            services.AddMassTransit(x =>
            {
                x.AddConsumer<MessageConsumer>();
                x.UsingAzureServiceBus((context, cfg) =>
                {
                    cfg.Host(ServiceBusConnectionString);
                    cfg.SubscriptionEndpoint(SubscriptionName, TopicName, e =>
                    {
                       e.ConfigureConsumer<MessageConsumer>(context);
                    });
                });
            });
            var provider = services.BuildServiceProvider();
            IBusControl busControl = provider.GetRequiredService<IBusControl>();
            busControl.Start();
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }
    }
Molay
  • 39
  • 7
0

I think you're adding the consumer before configuring it for servicebus (Need to doublecheck that). I did a different setup, where I used RecieveEndpoint instead of SubscriptionEndpoint, and it works.

                services.AddMassTransit(x =>
                {
                    x.UsingAzureServiceBus((context, cfg) =>
                    {
                        cfg.Host(ServiceBusConnectionString);
                        cfg.ReceiveEndpoint(typeof(SendMessage).Name.ToLower(), endpoint =>
                        {
                            endpoint.ConfigureConsumer<MessageConsumer >(context);
                        });
                    });
                    x.AddConsumer<MessageConsumer >();
                });
                services.AddMassTransitHostedService();

Everything else is kept as your example.

(Edit): I do realize that one is for topics and the other for queues, so be aware of that.