2

In my ASP.Net Core 6 application, a BackgroundService task called MqttClientService runs a MQTTNet client that handles incoming mqqt messages and responds with a message to indicate it was successful.

I have gotten the sample console app from the MQTTNet repo to work using Console.ReadLine(), however this feels like a hack for my use case. Is there a better way to keep the BackgroundService handling incoming messages without restarting constantly?

There is an example with Asp.Net Core and MQTTNet version 3, but it uses handles implemented by interfaces rather than async events that the library now uses: the MQTTNet's Upgrading Guide.

Any information will be appreciated, thank you.

MqttClientService.cs in Services/

using MQTTnet;
using MQTTnet.Client;
using System.Text;

namespace MqttClientAspNetCore.Services
{
    public class MqttClientService : BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                await Handle_Received_Application_Message();
            }
        }

        public static async Task Handle_Received_Application_Message()
        {

            var mqttFactory = new MqttFactory();

            using (var mqttClient = mqttFactory.CreateMqttClient())
            {
                var mqttClientOptions = new MqttClientOptionsBuilder()
                    .WithTcpServer("test.mosquitto.org")
                    .Build();

                // Setup message handling before connecting so that queued messages
                // are also handled properly. 
                mqttClient.ApplicationMessageReceivedAsync += e =>
                {
                    Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
                    Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");

                    // Publish successful message in response
                    var applicationMessage = new MqttApplicationMessageBuilder()
                        .WithTopic("keipalatest/1/resp")
                        .WithPayload("OK")
                        .Build();

                    mqttClient.PublishAsync(applicationMessage, CancellationToken.None);

                    Console.WriteLine("MQTT application message is published.");

                    return Task.CompletedTask;
                };

                await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

                var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
                    .WithTopicFilter(f =>
                    {
                        f.WithTopic("keipalatest/1/post");
                        f.WithAtLeastOnceQoS();
                    })
                    .Build();

                await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);

                Console.WriteLine("MQTT client subscribed to topic.");
                // The line below feels like a hack to keep background service from restarting
                Console.ReadLine();
            }
        }
    }
}

Program.cs

using MqttClientAspNetCore.Services;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddHostedService<MqttClientService>();

var app = builder.Build();

// To check if web server is still responsive
app.MapGet("/", () =>
{
    return "Hello World";
});


app.Run();
keipala
  • 768
  • 2
  • 7
  • 19
  • You don't need `Console.WriteLine` in the first place. That's only used to show that the *API* is running. It has nothing to do with the background service itself. You could remove `app.MapGet` completely, or use a generic instead of web host, with `Host.CreateDefaultBuilder()`. The background service runs because `app.Run` was called. It will keep running until the application is terminated – Panagiotis Kanavos Sep 15 '22 at 15:41
  • Sorry I should have been more clear. I was referring to the `Console.WriteLine` on the last line on the BackgroundService, not in Program.cs. I have removed it for clarity. Thanks. The purpose of `app.MapGet` is for me to make sure the server is still responsive, since the final goal is to have the background task running in parallel with a web server. – keipala Sep 15 '22 at 16:36
  • You don't need Console.WriteLine or ReadLine in the first place. [The service won't terminate if `ExecuteAsync` returns](https://github.com/dotnet/runtime/blob/main/src/libraries/Microsoft.Extensions.Hosting.Abstractions/src/BackgroundService.cs#L39). You can set app the client and handler in `ExecuteAsync` without a loop. – Panagiotis Kanavos Sep 16 '22 at 06:44
  • I posted an answer that shows how you can simplify the code, reducing `ExecuteAsync` to essentially two lines, and handle termination without awaiting the stop token – Panagiotis Kanavos Sep 16 '22 at 07:35

2 Answers2

3

If your service has nothing else useful to do, it can just wait for the CancellationToken to fire:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  try
  {
    await Handle_Received_Application_Message(stoppingToken);
  }
  catch (OperationCanceledException) { }
}

public static async Task Handle_Received_Application_Message(CancellationToken cancellationToken)
{
  ...
  Console.WriteLine("MQTT client subscribed to topic.");
  await Task.Delay(Timeout.Infinite, cancellationToken);
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
1

There's no need for Console.ReadLine or even the loop. The BackgroundService application code won't terminate when ExecuteAsync returns. If you want the application to terminate when ExecuteAsync terminates you have to actually tell it to through the IApplicationLifecycle interface.

I've found this the hard way the first time I tried using a Generic host for a command line tool. Which seemed to hang forever ....

ExecuteAsync can be used to set up the MQTT client and the event handler and just let them work. The code terminates only when StopAsync is called. Even then, this is done by signaling a cancellation token, not by aborting some worker thread.

The client itself can be built in the constructor, eg using configuration settings. Only ConnectAsync needs to be called in ExecuteAsync.

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{

    await _client.ConnectAsync(_clientOptions, CancellationToken.None);
    _logger.LogInformation("Connected");

    await _client.SubscribeAsync(_subscriptionOptions, CancellationToken.None);
    _logger.LogInformation("Subscribed");
}

The service code stops when StopAsync is called and the cancellation token is triggered. stoppingToken.Register could be used to call _client.DisconnectAsync when that happens, but Register doesn't accept an asynchronous delegate. A better option is to override StopAsync itself :

public virtual async Task StopAsync(CancellationToken cancellationToken)
{
    await _client.DisconnectAsync();
    await base.StopAsync(cancellationToken);
}

The constructor can create the client and register the message handler

public class MqttClientService : BackgroundService
{
    ILogger<MqttClientService> _logger;
    IMqttClient _client=client;

    MqttClientOptions _clientOptions;
    MqttSubscriptionOptions _subscriptionOptions;    
    string _topic;

    public MqttClientService(IOptions<MyMqttOptions> options, 
                            ILogger<MqttClientService> logger)
    {
        _logger=logger;
        _topic=options.Value.Topic;
        var factory = new MqttFactory();
        _client = factory.CreateMqttClient();
        _clientOptions = new MqttClientOptionsBuilder()
                        .WithTcpServer(options.Value.Address)
                        .Build();
        _subscriptionOptions = factory.CreateSubscribeOptionsBuilder()
                    .WithTopicFilter(f =>
                    {
                        f.WithTopic(options.Value.Topic);
                        f.WithAtLeastOnceQoS();
                    })
                    .Build();
        _client.ApplicationMessageReceivedAsync += HandleMessageAsync;
    }

Received messages are handled by the HandleMessageAsync method :

async Task HandleMessageAsync(ApplicationMessageProcessedEventArgs e)
{
    var payload=Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
    _logger.LogInformation("### RECEIVED APPLICATION MESSAGE ###\n{payload}",payload);
    var applicationMessage = new MqttApplicationMessageBuilder()
                    .WithTopic(_topic)
                    .WithPayload("OK")
                    .Build();

    await _client.PublishAsync(applicationMessage, CancellationToken.None);

    _logger.LogInformation("MQTT application message is published.");
}

Finally, since BackgroundService implements IDisposable, we can use Dispose to dispose the _client instance :

public void Dispose()
{
    Dispose(true);
}

protected virtual Dispose(bool disposing)
{
    if(disposing)
    {
        _client.Dispose();
        base.Dispose();
    }
    _client=null;
}
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • This answer works perfectly and is more correct for what I was trying to accomplish, thank you! To get it to work for me I changed `ApplicationMessageProcessedEventArgs` to `MqttApplicationMessageReceivedEventArgs` since I am not using the MQTTNet ManagedClient package, and added `void` to the `protected virtual Dispose(bool disposing) method`. – keipala Sep 16 '22 at 09:55