1

I want to build an mqttnet broker that support high availability.I want to add the redundancy feature to my servers. I am building a mqtt broker with 2 nodes and i want to replicate messages between the 2 nodes. The nodes are servers with some basic features, they just receive and display messages from the client. Here is the code of one of the servers :

using System.Text;
using System.Text.Json;
using MQTTnet;
// using MQTTnet.Protocol;
using MQTTnet.Server;
using static System.Console;

namespace Server2;

internal abstract class Server2
{
    static string storePath = "Server2/RetainedMessages.json";

    private static async Task Main(string[] args)
    {
// Create the options for our MQTT Broker
        var options = new MqttServerOptionsBuilder()
            // set endpoint to localhost
            .WithDefaultEndpoint()
            .WithDefaultEndpointPort(1884).WithPersistentSessions();

// creates a new mqtt server     
        var server = new MqttFactory().CreateMqttServer(options.Build());
        
        
        server.LoadingRetainedMessageAsync += ServerOnLoadingRetainedMessageAsync;

// start the server with options  
        await server.StartAsync(); 
        
// handler for new connections
        server.ClientConnectedAsync += ClientConnectedEventArgs;
        
// handler for new messages
        server.InterceptingPublishAsync += ServerInterceptingPublishAsync;

// keep application running until user press a key
        WriteLine("Press any key to stop the server...");
        ReadLine();

//handlers implementation

        Task ServerInterceptingPublishAsync(InterceptingPublishEventArgs arg)
        {
            // Convert Payload to string
            var payloadSegment = arg.ApplicationMessage.PayloadSegment;
            var payload = Encoding.UTF8.GetString(payloadSegment.Array!, payloadSegment.Offset, payloadSegment.Count);

            WriteLine(
                " TimeStamp: {0} -- Message: ClientId = {1}, Topic = {2}, Payload = {3}, QoS = {4}, Retain-Flag = {5}",
                DateTime.Now,
                arg.ClientId,
                arg.ApplicationMessage?.Topic,
                payload,
                arg.ApplicationMessage?.QualityOfServiceLevel,
                arg.ApplicationMessage?.Retain);

            return Task.CompletedTask;
        }
        
        
        static Task ClientConnectedEventArgs(ClientConnectedEventArgs arg)
        {
            WriteLine("New connection: ClientId = {0}, Endpoint = {1}", arg.ClientId, arg.Endpoint);
            return Task.CompletedTask;
        }

    }

    private static async Task ServerOnLoadingRetainedMessageAsync(LoadingRetainedMessagesEventArgs arg)
    {
        var models = arg.LoadedRetainedMessages.Select(MqttRetainedMessageModel.Create);
        var buffer = JsonSerializer.SerializeToUtf8Bytes(models);
        await File.WriteAllBytesAsync(storePath, buffer);
        WriteLine("Retained messages saved.");
    }
}

0 Answers0