tl;dr: How can I avoid disposing the MQTTnet client while it is in use on another thread? Perhaps this pertains to any IDisposable
, but in the case of ManagedMqttClient
, there are also calls like IsConnected
to worry about before async calls.
Note: We are on MQTTnet v3.0.16. I'm open to answers that include "upgrade to latest, then use approach X"
I inherited an application which uses the ManagedMqttClient
and originally replaced/disposed that client when the user made changes to broker settings:
using MQTTnet;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Threading.Tasks;
internal class OriginalApproach
{
private IManagedMqttClient _mqttClient;
private static MqttFactory _factory;
public OriginalApproach()
{
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttClientDisconnectedEventArgs => OnDisconnect(MqttClientDisconnectedEventArgs));
}
//Called if the user changes settings that affect the way we connect
//to the broker.
public async void OnSettingsChange()
{
if (_mqttClient != null && _mqttClient.IsConnected)
{
StopAsync();
return;
}
//Disposal isn't the only thread safety issue
if (_mqttClient != null && _mqttClient.IsStarted)
{
await Reconnect(TimeSpan.FromSeconds(2));
}
}
public async void StopAsync()
{
if (_mqttClient != null)
{
await _mqttClient.StopAsync();
await Task.Delay(System.TimeSpan.FromSeconds(2));
}
}
public async void OnDisconnect(MqttClientDisconnectedEventArgs e)
{
await Reconnect(TimeSpan.FromSeconds(5));
}
public async Task Reconnect(TimeSpan delay)
{
StopAsync();
await Task.Delay(delay);
Connect();
}
public async void Connect()
{
await CreateManagedClient();
try
{
if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
{
StartAsync();
}
}
catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */ }
catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */ }
}
public async Task<bool> CreateManagedClient()
{
try
{
if (_mqttClient != null)
_mqttClient.Dispose();
_factory = new MqttFactory();
_mqttClient = _factory.CreateManagedMqttClient();
await Task.Delay(System.TimeSpan.FromSeconds(2));
}
catch (Exception e)
{
_mqttClient.Dispose();
_mqttClient = null;
return false;
}
return true;
}
public async void StartAsync()
{
MqttApplicationMessage mess = new MqttApplicationMessage();
mess.Payload = BuildDeathCertificate();
mess.Topic = "...";
MqttClientOptionsBuilder clientOptionsBuilder = new MqttClientOptionsBuilder();
IMqttClientOptions options = clientOptionsBuilder.WithTcpServer("Broker Address", 1234)
.WithClientId("ABCD")
.WithCleanSession(true)
.WithWillMessage(mess)
.WithKeepAlivePeriod(new System.TimeSpan(1234))
.WithCommunicationTimeout(new System.TimeSpan(int.MaxValue))
.Build();
var managedClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(options)
.Build();
if (!_mqttClient.IsStarted && !_mqttClient.IsConnected)
{
try
{
await _mqttClient.StartAsync(managedClientOptions);
}
catch (Exception e) { /* ... */ }
}
}
byte[] BuildDeathCertificate()
{
return new byte[1234];
}
public async void PublishMessage(byte[] payloadBytes)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic("...")
.WithPayload(payloadBytes)
.WithExactlyOnceQoS()
.WithRetainFlag(false)
.Build();
try
{
await _mqttClient.PublishAsync(message);
}
catch (NullReferenceException e) { /* ... */ }
}
}
Obviously, there are numerous thread-safety issues here, and various situations have yielded ObjectDisposed
exceptions.
I played with using a single ManagedMqttClient
for the lifetime of the application:
internal class SingleClientTest
{
private IManagedMqttClient _mqttClient;
public SingleClientTest()
{
var factory = new MqttFactory();
//Used for lifetime of application
_mqttClient = factory.CreateManagedMqttClient();
}
public async void Connect()
{
//No longer calling CreateManagedClient() here
try
{
if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
{
StartAsync();
}
}
catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */ }
catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */ }
}
//The other methods are mostly unchanged
}
Overall it solves the ObjectDisposed
issue, but it doesn't address thread-safety of calling IsConnected
before the async calls. And, given that MqttFactory
exists, reusing one client feels like a hack. Also, I've run into one use case that acts a bit like this issue. Specifically, StartAsync()
yielded the exception "The managed client is already started" despite IsStarted
being false. I can provide more detail if desired, but for now I'll avoid muddying the question.
I also explored adding lock
s around calls to the client, but they cannot be used around awaited calls because of deadlock risk.
Finally, I've read through the MQTTnet samples, wiki, a few of the issues, and poked around the code a bit. So far, I haven't found additional concurrency mechanisms in the library.
I'm exploring a few options (perhaps a combination of these):
- Using
SemaphorSlim
around all calls to the client, as described here - It looks like it may work aroundawait
ed calls. Not sure if this would introduce new timing issues, and given that we are on .NET Framework, use appears to come with risks - Using
MqttClient
, as opposed toManagedMqttClient
. This thread makes it sound likeMqttClient
is preferred. Should I be using it instead? Is it reasonable to use oneMqttClient
for the life of the app (usingDisconnectAsync()/ConnectAsync()
when broker settings change)? (This still doesn't address checks like_mqttClient.IsConnected
) - Surround every call to the client object with a
try/catch
forObjectDisposed
exceptions, and replace the client like this:
var oldClient = _mqttClient
_mqttClient = _factory.CreateManagedMqttClient();
oldClient?.Dispose();
Again, this doesn't address checks like _mqttClient.IsConnected
.
Just wondering if anyone can offer insight as to the commonly accepted way of doing this.