I want to build an mqttnet broker that support high availability.I want to add the redundancy feature to my servers. I am building a mqtt broker with 2 nodes and i want to replicate messages between the 2 nodes. The nodes are servers with some basic features, they just receive and display messages from the client. Here is the code of one of the servers :
using System.Text;
using System.Text.Json;
using MQTTnet;
// using MQTTnet.Protocol;
using MQTTnet.Server;
using static System.Console;
namespace Server2;
internal abstract class Server2
{
static string storePath = "Server2/RetainedMessages.json";
private static async Task Main(string[] args)
{
// Create the options for our MQTT Broker
var options = new MqttServerOptionsBuilder()
// set endpoint to localhost
.WithDefaultEndpoint()
.WithDefaultEndpointPort(1884).WithPersistentSessions();
// creates a new mqtt server
var server = new MqttFactory().CreateMqttServer(options.Build());
server.LoadingRetainedMessageAsync += ServerOnLoadingRetainedMessageAsync;
// start the server with options
await server.StartAsync();
// handler for new connections
server.ClientConnectedAsync += ClientConnectedEventArgs;
// handler for new messages
server.InterceptingPublishAsync += ServerInterceptingPublishAsync;
// keep application running until user press a key
WriteLine("Press any key to stop the server...");
ReadLine();
//handlers implementation
Task ServerInterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
// Convert Payload to string
var payloadSegment = arg.ApplicationMessage.PayloadSegment;
var payload = Encoding.UTF8.GetString(payloadSegment.Array!, payloadSegment.Offset, payloadSegment.Count);
WriteLine(
" TimeStamp: {0} -- Message: ClientId = {1}, Topic = {2}, Payload = {3}, QoS = {4}, Retain-Flag = {5}",
DateTime.Now,
arg.ClientId,
arg.ApplicationMessage?.Topic,
payload,
arg.ApplicationMessage?.QualityOfServiceLevel,
arg.ApplicationMessage?.Retain);
return Task.CompletedTask;
}
static Task ClientConnectedEventArgs(ClientConnectedEventArgs arg)
{
WriteLine("New connection: ClientId = {0}, Endpoint = {1}", arg.ClientId, arg.Endpoint);
return Task.CompletedTask;
}
}
private static async Task ServerOnLoadingRetainedMessageAsync(LoadingRetainedMessagesEventArgs arg)
{
var models = arg.LoadedRetainedMessages.Select(MqttRetainedMessageModel.Create);
var buffer = JsonSerializer.SerializeToUtf8Bytes(models);
await File.WriteAllBytesAsync(storePath, buffer);
WriteLine("Retained messages saved.");
}
}