4

I was wondering if someone can please help with the following situation:

I cannot solve a memory leak with a RabbitMQ Publisher written in C# and using .Net core 5.0.

This is the csproj file :

<Project Sdk="Microsoft.NET.Sdk">   
  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net5.0</TargetFramework>
    <RuntimeIdentifier>win-x64</RuntimeIdentifier>
  </PropertyGroup>
  ...
</Project>  

I have a .net console application running inside a virtual machine which connects to a server through an API (a registered 64 bit dll and referenced as a COM reference), gets information from that server and then tries to publish this information to a RabbitMQ machines located on an AWS cloud (a load balancer with several nodes for this RMQ instance).

Accessing the API in the code is done in the following way :

        private void SetUpApi () {
            Utils.log.Info (_api.SetAPIOptions ("<CONNECTIONOPTIONS><CALCULATED_PRICES Enabled='true' MaximumDepth='4'/></CONNECTIONOPTIONS>"));
            _api.OnServerConnect += OnServerConnect;
            _api.OnServerDisconnect += OnServerDisconnect;
            _api.OnNewData += OnNewData;            
        }
        private void OnNewData(string strXML){
            try{
                if (strXML.Contains("<ORDER")){
                    ParseXMLAnswer(strXML, "OnNewData ()");
                }
            }
            catch (Exception ex) {
                if (ex.InnerException is AlreadyClosedException || ex.InnerException is BrokerUnreachableException)
                    Utils.log.Error("OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException ");
                else
                    Utils.printException("OnNewData ()", ex);
            }
        }
        
        private void ParseXMLAnswer(string strOutputXML, string caller) {
            XmlDocument doc = new XmlDocument();
            doc.LoadXml(strOutputXML);
            string jsonText = JsonConvert.SerializeXmlNode(doc);
            var o = JObject.Parse(jsonText);

            if (o["APIDATA"]["ORDER"].Type is JTokenType.Object){
                JObject order = (JObject)o["APIDATA"]["ORDER"];

                SendOrderToRMQ(order);
            }
            else if (o["APIDATA"]["ORDER"].Type is JTokenType.Array){
                JArray orders = (JArray)o["APIDATA"]["ORDER"];
                foreach (var item in orders.Children()){
                    SendOrderToRMQ((JObject)item);
                }
            }
            doc = null;
            GC.Collect();
            GC.WaitForPendingFinalizers();
        }

        private void SendOrderToRMQ (JObject order){
            JObject instrSpeciefier = (JObject) order["INSTSPECIFIER"];

            var firstSeqID = instrSpeciefier.GetValue("@FirstSequenceID").ToString();
            var firstSeqItemID = instrSpeciefier.GetValue("@FirstSequenceItemID").ToString();
                       
            if (sequenceItemsHashed.ContainsKey(firstSeqID) &&
                sequenceItemsHashed[firstSeqID].Contains(firstSeqItemID)){
                string itemName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@FirstSequenceItemName").ToString());
                string instrumentName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@InstName").ToString());

                int index = sequenceItemsHashed[firstSeqID].IndexOf(firstSeqItemID) + 1;
                var binding = instrumentName + "." + sequencesFromInstruments[firstSeqID] + "." + itemName + "." + index;

                serviceInstance1.Publish(
                   order.ToString(),
                   _exchangeName,
                   "",
                   binding);
            }
            order = null;
            instrSpeciefier = null;           
        }

During peak business hours I get around 400 - 500 messages/second from the API. These messages come in the form of XML messages. For example, a message can contain several orders as shown in the example below. One element can contain an action to insert (to create) an Order and another to remove a certain Order.

<?xml version="1.0" encoding="utf-16"?>
  <APIDATA xmlns="api-com">
  <ORDER EngineID="0" PersistentOrderID="2791" ...>
    <INSTSPECIFIER InstID="287" ... />
    ...
  </ORDER>
  <ORDER EngineID="0" PersistentOrderID="9840" ...>
    <INSTSPECIFIER InstID="288" ... />
    ...
  </ORDER>

The RabbitMQ server is configured and I connect to it using SSL (with a certificate and a key for it). I use to connect to the RMQ the RabbitMQ.Client v6.2.1. The exchange, the queues and the bindings are already defined in RabbitMQ. My Producer application only connects to it and starts publishing.

enter image description here

It is important that I use a synchronous publishing method as the order of messages we get is very important. For example in one message we get an action to create an Order and another message which comes immediately after which is telling to remove the same Order. If I would use an async method to publish to RMQ I would be possibly get the removal action before the insert action.

  <?xml version="1.0" encoding="utf-16"?>
  <APIDATA xmlns="api-com">
  <ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Insert" ...>
      ...
  </ORDER>

The removal messages:

  <?xml version="1.0" encoding="utf-16"?>
  <APIDATA xmlns="api-com">
  <ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Remove" ...>
      ...
  </ORDER>

I use the following method for publishing to RMQ: the object pool (Microsoft provides a package named Microsoft.Extensions.ObjectPool) - method described in here - https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/ .

I'm using here the following code:

class RabbitManager : IRabbitManager
{
    private readonly DefaultObjectPool<IModel> _objectPool;
    public RabbitManager(IPooledObjectPolicy<IModel> objectPolicy){
        _objectPool = new DefaultObjectPool<IModel>(objectPolicy, Environment.ProcessorCount * 2);
    }

    public void Publish<T>(T message, string exchangeName, string exchangeType, string routeKey) where T : class {
        if (message == null)
            return;

        var channel = _objectPool.Get();
        try{
            var sendBytes = Encoding.UTF8.GetBytes(message.ToString());
            var properties = channel.CreateBasicProperties();
            properties.ContentType = "application/json";
            properties.DeliveryMode = 1; // Doesn't persist to disk
            properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());

            channel.BasicPublish(exchangeName, routeKey, properties, sendBytes);
        }
        catch (Exception ex) {
            throw ex;
        }
        finally {
            _objectPool.Return(channel);
        }
    }
}
public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>
{
    private readonly RabbitOptions _options;
    private readonly IConnection _connection;

    public RabbitModelPooledObjectPolicy(RabbitOptions _options){
        this._options = _options;
        _connection = GetConnection();
    }

    private IConnection GetConnection() {
        var factory = new ConnectionFactory() {
            HostName = _options.HostName,
            UserName = _options.UserName,
            Password = _options.Password,
            //Port = _options.Port,
            VirtualHost = _options.VHost,
        };

        if (!String.IsNullOrEmpty(_options.CertPath))
        {
            factory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
            factory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
            factory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
            factory.Ssl.ServerName = _options.HostName;
            factory.Ssl.CertPath = _options.CertPath;
            factory.Ssl.CertPassphrase = _options.CertPass;
            factory.Ssl.Version = SslProtocols.Tls12;
            factory.Ssl.Enabled = true;
        }

        factory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
        factory.AutomaticRecoveryEnabled = true;        // enable automatic connection recovery
        factory.RequestedChannelMax = 32;

        var _connection = factory.CreateConnection();
        _connection.ConnectionShutdown += Connection_ConnectionShutdown;

        return _connection;
    }

    private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
        Utils.log.Info("Connection broke!");
    }

    private bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors){
        return true;
    }

    public IModel Create(){
        return _connection.CreateModel();
    }

    public bool Return(IModel obj) {
        if (obj.IsOpen) {
            return true;
        }
        else {
            obj?.Dispose();
            return false;
        }
    }
}

Below is a screenshot with the problem - the constant memory inscrease :

enter image description here

This is the stack trace of a memory snapshot taken just after the above screenshot :

enter image description here

Just after the screenshot above was taken I got the following error message in my program, in the console :

26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
   at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
   at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
   at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
   at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
   at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
   at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
   at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348
26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
   at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
   at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
   at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
   at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
   at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
   at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
   at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348

And this leads to the final crash of the program :

enter image description here

What I did to prevent the memory leak :

  1. The classes have the IDisposable interfaces enabled
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                // free managed resources
                _onTimePerHour.Dispose();
                _api.OnNewData -= OnNewData;
            }
            // free native resources if there are any.
        }
  1. Forced garbage collection after each message received :
private void ParseXMLAnswer(string strOutputXML, string caller) {
            ...
            doc = null;
            GC.Collect();
            GC.WaitForPendingFinalizers();
}

This helps a bit more and now the memory problem i have increases after a longer period of time.

  1. I use the ReShaper (enter link description here) plugin for Visual Studio to give me a better understanding of the Stack Trace of the memory problem but it does not help much.

enter image description here

What do I think the problem is :

The RabbitMQ Producer app gets many messages per second which are then parsed, split into several JSON messages and send to RMQ using the same channel. The following cases might occur :

  • I'm publishing on a single RMQ channel and somehow i should use several channels (one connection but multiple channels)
  • I'm receiving more messages than I can parse and send through RMQ using the RabbitMQ.Client .net library
  • I'm holding up some references in memory for some objects (maybe messages) which do not get freed up;

Did someone had this problem before ? Because I cannot find anywhere any info on this "SingleProducerSingleConsumerQueue+Segment<Memory> out of memory" issue.

Does someone know how to analyse more in depth this problem ?

Big thanks !


Edit 1

I guess more info is needed to solve this memory issue.

I have several consumers which consume data from RabbitMQ (like NodeJS and python apps). Thus, I need to design the RabbitMQ Producer in a generic way, as each consumer needs different data. And I cannot modify and restart my RabbitMQ Producer each time I have a new consumer app. So I need to publish my messages in a generic way.

For example, each consumer has its own dedicated queue, with dedicated bindings. Let's say I have consumer1 with queue cons1 and bindings :

  • marketName.productName.*.1 (the productName would correspond for days).

This binding is dynamic and for now it corresponds to Monday (04.April) but tomorrow it will correspond to Tueday (05.April).

So I need to store in memory the marketNames and productNames by using

private static read-only Dictionary<string, List<string>> sequenceItemsHashed = new Dictionary<string, List<string>>();
private static readonly Dictionary<string, string> sequencesFromInstruments = new Dictionary<string, string>();

I mention that sequenceItemsHashed corrrespond to marketNames and sequencesFromInstruments to productNames in my logic.

This way I send all the messages to RMQ and I do the sorting in RMQ afterwards using bindings.


Edit 2

From what I understood, in order to solve my question I need something like the following architecture (enter link description here) :

enter image description here

So multiple threads in my single connection to the RMQ server and one channel per thread.


Edit 3

Implemented successfully the pipelines, the ConcurrentQueue and the Consumer Thread which pushes to RMQ but still have memory issues:

private readonly TransformBlock<string, string> orderFilter;
private readonly TransformBlock<string, JObject> xmlParser;
//private readonly TransformBlock<XmlDocument, JObject> xmlToJsonTransformer;
private readonly TransformManyBlock<JObject, JToken> jsonOrderFactory;
private readonly ActionBlock<JToken> messageSender;

ConcurrentQueue<JToken> concurrentQueue = new ConcurrentQueue<JToken>();

public Service1 (string [] args) {
    ...
    // setup pipeline blocks
    orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
    xmlParser = new TransformBlock<string, JObject>(ParseXml);
    jsonOrderFactory = new TransformManyBlock<JObject, JToken>(CreateOrderMessages);
    messageSender = new ActionBlock<JToken>(SendMessage);

    // build your pipeline            
    orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
    orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs

    xmlParser.LinkTo(jsonOrderFactory);
    jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });

    Task t2 = Task.Factory.StartNew(() =>
            {
                while (true) { 
                    if (!concurrentQueue.IsEmpty)
                    {
                        JToken number;
                        while (concurrentQueue.TryDequeue(out number))
                        {
                            _rabbitMQ.PublishMessages(
                                Encoding.ASCII.GetBytes(number.ToString()),
                                "test"
                            );
                        }
                    } else
                    {
                        Thread.Sleep(1);
                    }
                }
            });        
     ...
}

private string FilterIncomingMessages(string strXml){
    if (strXml.Contains("<ORDER")) return strXml;
    return null;
}

private JObject ParseXml(string strXml){
    XmlDocument doc = new XmlDocument();
    doc.LoadXml(strXml);
    string jsonText = JsonConvert.SerializeXmlNode(doc);
    var o = JObject.Parse(jsonText);
    return o;
}

private IEnumerable<JToken> CreateOrderMessages(JObject o){
    List<JToken> myList = new List<JToken>();
            if (o.ContainsKey("GV8APIDATA")){
                if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Object){
                    JToken order = o["GV8APIDATA"]["ORDER"];
                    myList.Add(order);
                }
                else if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Array){
                    JToken orders = o["GV8APIDATA"]["ORDER"];
                    foreach (var order in orders.Children()){
                        myList.Add(order);
                    }
                }
            }
            return myList.ToArray ();
        }

private void SendMessage(JToken order){
    concurrentQueue.Enqueue(order);
}

The new solution helps to break the logic into several small parts but I still have a constant memory increase.

enter image description here


Edit 4

Taking into account @Fildor's answer I did the following :

Instead of converting strings containing xml with <ORDER ...> elements to JSON, I deserialize XML to objects using the pipelines and the code below.

I removed the part with the Thread and the ConcurrentQueue and I'm publishing directly in the last ActionBlock.

This solves my memory leak problem, but there are other problems like :

  • If the messages are big enough I will only be able to print around 120 messages / second. I get the rate of 1780 messages/s if I just print the simple string "test".

enter image description here

public Service1 (string [] args) {
            ...             
            // setup pipeline blocks
            orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
            xmlParser = new TransformBlock<string, OrdersResponse>(ParseXml);
            jsonOrderFactory = new TransformManyBlock<OrdersResponse, Order>(CreateOrderMessages);
            messageSender = new ActionBlock<Order>(SendMessage);

            // build your pipeline            
            orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
            orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs
            xmlParser.LinkTo(jsonOrderFactory);
            jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });

            RunAsConsole(args);
        }

        private readonly TransformBlock<string, string> orderFilter;
        private readonly TransformBlock<string, OrdersResponse> xmlParser;
        private readonly TransformManyBlock<OrdersResponse, Order> jsonOrderFactory;
        private readonly ActionBlock<Order> messageSender;

        private void OnNewData(string strXML){
            orderFilter.Post(strXML); 
        }        

        private string FilterIncomingMessages(string strXml){
            if (strXml.Contains("<ORDER")) return strXml;
            return null;
        }

        private OrdersResponse ParseXml(string strXml) {
            var rootDataObj = DeserializeOrdersFromXML(strXml);
            return rootDataObj;
        }

        private OrdersResponse DeserializeOrdersFromXML(string strOutputXML){
            var xsExpirations = new XmlSerializer(typeof(OrdersResponse));
            OrdersResponse rootDataObj = null;
            using (TextReader reader = new StringReader(strOutputXML)) {
                rootDataObj = (OrdersResponse)xsExpirations.Deserialize(reader);
                reader.Close();
            }
            return rootDataObj;
        }

        private IEnumerable<Order> CreateOrderMessages(OrdersResponse o){
            return o.orders;
        }

        private void SendMessage(Order order) {
            _rabbitMQ.PublishMessages(
                    Encoding.ASCII.GetBytes(order.ToString()),
                    "test"
                );
        }

And the ORDER object looking like :

    [Serializable()]
    [XmlRoot (ElementName = "ORDER")]
    public class Order : IDisposable {

        public void Dispose()
        {
            EngineID = null;
            PersistentOrderID = null;
            ...
            InstrumentSpecifier.Dispose();
            InstrumentSpecifier = null;
            GC.SuppressFinalize(this);
        }

        [XmlAttribute (AttributeName = "EngineID")]
        public string EngineID { get; set; }
        [XmlAttribute (AttributeName = "PersistentOrderID")]
        public string PersistentOrderID { get; set; }
        ... 
        [XmlElement(ElementName = "INSTSPECIFIER")]
        public InstrumentSpecifier InstrumentSpecifier { get; set; }
    }

And my new RabbitMQ class :

public class RMQ : IDisposable {
    private IConnection _connection;
    public IModel Channel { get; private set; }        
    private readonly ConnectionFactory _connectionFactory;
    private readonly string _exchangeName;

    public RMQ (RabbitOptions _rabbitOptions){
        try{
            // _connectionFactory initialization
            _connectionFactory = new ConnectionFactory()
            {
                HostName = _rabbitOptions.HostName,
                UserName = _rabbitOptions.UserName,
                Password = _rabbitOptions.Password,
                VirtualHost = _rabbitOptions.VHost,
            };
            this._exchangeName = _rabbitOptions.ExchangeName;

            if (!String.IsNullOrEmpty(_rabbitOptions.CertPath)){
                _connectionFactory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
                _connectionFactory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
                _connectionFactory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
                _connectionFactory.Ssl.ServerName = _rabbitOptions.HostName;
                _connectionFactory.Ssl.CertPath = _rabbitOptions.CertPath;
                _connectionFactory.Ssl.CertPassphrase = _rabbitOptions.CertPass;
                _connectionFactory.Ssl.Version = SslProtocols.Tls12;
                _connectionFactory.Ssl.Enabled = true;
            }

            _connectionFactory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
            _connectionFactory.AutomaticRecoveryEnabled = true;        // enable automatic connection recovery
            //_connectionFactory.RequestedChannelMax = 10;

            if (_connection == null || _connection.IsOpen == false){
                _connection = _connectionFactory.CreateConnection();
                _connection.ConnectionShutdown += Connection_ConnectionShutdown;
            }
            if (Channel == null || Channel.IsOpen == false){
                Channel = _connection.CreateModel();
            }
            Utils.log.Info("ConnectToRabbitMQ () Connecting to RabbitMQ. rabbitMQenvironment = ");
        }
        catch (Exception ex){
            Utils.log.Error("Connection to RabbitMQ failed ! HostName = " + _rabbitOptions.HostName + " VirtualHost = " + _rabbitOptions.VHost);
            Utils.printException("ConnectToRMQ ()", ex);
        }
    }        

    private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
        Utils.log.Info ("Connection broke!");
        try{
            if (ReconnectToRMQ()){
                Utils.log.Info("Connected!");
            }
        }
        catch (Exception ex){
            Utils.log.Info("Connect failed!" + ex.Message);
        }
    }

    private bool ReconnectToRMQ(){
        if (_connection == null || _connection.IsOpen == false){
            _connection = _connectionFactory.CreateConnection();
            _connection.ConnectionShutdown += Connection_ConnectionShutdown;                
        }

        if (Channel == null || Channel.IsOpen == false){
            Channel = _connection.CreateModel();
            return true;
        }
        return false;
    }

    private bool ValidateServerCertificate (object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) {
        return true;
    }

    public void DisconnectFromRMQ () {
        Channel.Close ();
        _connection.Close ();
    }     

    public void Dispose(){
        try{
            Channel?.Close();
            Channel?.Dispose();
            Channel = null;

            _connection?.Close();
            _connection?.Dispose();
            _connection = null;
        }
        catch (Exception e){
            Utils.log.Error("Cannot dispose RabbitMQ channel or connection" + e.Message);
        }
    }

    public void PublishMessages (byte [] message, string routingKey) {            
        if (this._connection == null || ! _connection.IsOpen) {
            Utils.log.Error ("PublishMessages(), Connect failed! this.conn == null || !conn.IsOpen ");
            ReconnectToRMQ();
        } else { 
            var properties = Channel.CreateBasicProperties();
            properties.Persistent = true;

            Channel.BasicPublish (_exchangeName, routingKey, properties, message);
            //serviceInstance1.Publish(message, _rabbitOptions.ExchangeName, "", routingKey);
        }
    }
}

Now the funny thing is if I publish only a small string like "test" to RabbitMQ to my predefined queue I can publish more than 1780 messages/second.

R13mus
  • 752
  • 11
  • 20
  • 2
    _"Freeing myself manually the memory by using :"_ - that's not actually what you are doing. You are _suggesting_ to the runtime to perform GC. If you find yourself in the place of thinking you need this, then you have a different problem. And here, it's seemingly allocations and / or references that are held for too long. So, what I'd do is rig up a benchmark with enough data to get realistic measurements. Then tweak all the identified bottlenecks (_one by one_) and compare benchmarks. Keep improvements, ditch changes that didn't improve anything. – Fildor Apr 26 '21 at 11:42
  • And you seem to do all the stuff on the eventhandling thread. Maybe have a look into DataFlow to have your process a little more pipelined? – Fildor Apr 26 '21 at 11:47
  • thanks for your answer @Fildor ! If I don't use the part of "freeing the memory" by using GC.Collect () then the GC will not be able to free up the memory and I will have that peak memory increase until it app crashes. I tried doing smth like "var task = Task.Run(() => ParseXMLAnswer(strXML, "OnNewData ()")).ContinueWith(t => Utils.log.Info("Task done."));" to parse answers on different threads but this kill the program after a 30 seconds. Also, RMQ does not like sharing channel between multiple threads - https://stackoverflow.com/questions/5681118/rabbitmq-channel-creation-guidelines – R13mus Apr 26 '21 at 12:02
  • i do understand that allocations / references are help in memory for too long but how do I see which one in particular ? because from the stack trace it looks like it is something inside the RabbitMQ.Client library ... – R13mus Apr 26 '21 at 12:04
  • I must admit I don't know enough about your project to deeply analyse that. Simply wrapping a Task around (thus pushing the whole ordeal to the Threadpool) is also not going to help, if you don't really know, what you are doing (no offense). Calling GC.Collect just buys you some time but won't solve your problem. Now ... – Fildor Apr 26 '21 at 12:07
  • ... what I see here are some clearly distinguishable "Steps": 1. Receive Msg, 2. Filter Msg, 3. Transcode XML to JSON, 4. Send Msg. Now, from experience I know: Keep code that runs on an event handler short. So, first thing, I'd do is to have the eventhandler write the data to a queue (that keeps order). Done. Nothing more. Then have a (or more, perhaps) different Threads work that queue (always keeping order). Then, maybe split the following Steps into more queues. That way, you can "fan out" in between. For example: messages that contain more than 1 order, could be processed in parallel. – Fildor Apr 26 '21 at 12:13
  • 1
    Is `SetUpApi` called once at most? If not, how many times is it called? If you're so memory constrained I'm also wondering why you're juggling so many strings. You go from a string to an (old) XmlDocument only to serialize it to a Json string so you can make a JObject from it. I expect you can reduce string size by just going to an XDocument once, find the order XElement node and only then call JsonConvert.SerializeXNode on that single order node. Beyond that you might still need to offload your work from the event thread as indicated by Fidor. – rene Apr 26 '21 at 12:22
  • @Fildor, the solution you gave might work but I see a small problem - i still have to somehow publish to RMQ using different threads and so different channels and this is a problem from what I read or at least I don't see anywhere a working solution. Also the data volume is 2.8 MB/second of data when I publish it to RMQ – R13mus Apr 26 '21 at 12:24
  • It's not a solution, actually. It would be a "start" towards a solution ;D Your problem has many layers and aspects to it. You shouldn't expect to solve it all with "one" strike. For the Threading: _maybe_ it will be necessary to use mor than one connection but I am not RMQ savvy, so I cannot help with _that_. You also may want to check if that amount of data can actually be handled by your networking setup (it's a cloud env, right?). If your outbound is limited below what you need, of course it will always heap up ... – Fildor Apr 26 '21 at 12:26
  • @rene - thanks for your answer ! you're right, there is a reason behind my current logic - I have multiple consumers (applications written in nodejs, python, etc) which consume data from RMQ. And these bindings are dynamic. For example, I have a dedicated queue with more than 150 bindings like - "marketName.productName.*.1" this 1 represents an index which is valid today for orders which have the timestamp of tomorrow. But tomorrow this index (i.e. 1) will correspond to the next day and so on, so i need to calculate it dynamically in my app. I will write an edit on my post to explain more. – R13mus Apr 26 '21 at 12:31
  • @rene - no, i only call once the setUpApi () method. Then each time there is an event coming from the company which gave me the api the method ParseAnswer () is called – R13mus Apr 26 '21 at 12:32
  • 1
    _"Then each time there is an event coming from the company which gave me the api the method ParseAnswer () is called"_ Exactly. On the same thread one by one. See, where this goes? – Fildor Apr 26 '21 at 12:34
  • Makes sense what you are saying, now in practice how will this work ? Let's say each time I have a message I will try to write to RMQ in a different thread, right ? But this won't work with a single channel, so I need to have a dedicated channel for each thread. And what happens if I get 100 messages / second ? I am allowed to create a max of 32 channels per connection. Then I need to handle those channels, to close them or return them to a pool of channels which can be an expensive operation in terms of memory. I will create an edit with launching different threads for each incoming message. – R13mus Apr 26 '21 at 12:58
  • 1
    No, don't launch 1 thread / message. Just decouple event handling from processing. – Fildor Apr 26 '21 at 13:00
  • I guess it makes sense what your saying @Fildor. Now, how to do it exactly and check if it works or not ... I have no clue. But will continue, as it is quite important to get this done as soon as possible. I will write here a new edit if I have some more news. thanks guys ! – R13mus Apr 26 '21 at 13:08
  • 1
    @rene Have you tried taking out 1 type of operation at a time to isolate the leak to one of the stages of your pipeline? Also, a pipeline architecture as mentioned above may help (e.g. using an internal concurrent queue with dedicated consumer thread for the transcode step), another queue for send, etc. Finally, are you sure it's a memory leak or could it be memory *pressure*. These are very different. What happens when the app reverts to a steady state (no messages for a while?)... memory freed or does it stick around? – Kit Apr 26 '21 at 13:28
  • @rene Looks like you made an effort to unregister event handlers... good. Did you get all of them? They're notorious for memory leaks! – Kit Apr 26 '21 at 13:31
  • @Kit I'm not the OP ;) – rene Apr 26 '21 at 13:37
  • Oh my bad!!! @R13mus see above. – Kit Apr 26 '21 at 13:38
  • No @Kit I didn't try with an internal queue as I was afraid that it will cause even more memory issues as I get a lot of data in my producer from the API and I was supposing that a consumer thread would have problems with the RMQ Channels. But I have to try it out ... and post my findings afterwards here. Many thanks ! – R13mus Apr 26 '21 at 14:53

1 Answers1

2

First, it seems you are clogging the event handling thread. So, what I'd do is decouple event handling from the actual processing:

( Untested! Just an outline!)

REMOVED FAULTY CODE

Then in serviceInstance1, I would have Publish enqueue the orders in a BlockingCollection, on which a dedicated Thread is waiting. That thread will do the actual send. So you'll marshall the orders to that thread regardless of what you chose to do in Processor and all will be decoupled and in-order.

You probably will want to set BlockOptions according to your requirements.

Mind that this is just a coarse outline, not a complete solution. You may also want to go from there and minimize string-operations etc.

EDIT

Some more thoughts that came to me since yesterday in no particular order:

For reference:

In response to EDIT 3 in the question:

Task t2 = Task.Factory.StartNew(() =>
            {
                while (true) { 
                    if (!concurrentQueue.IsEmpty)
                    {
                        JToken number;
                        while (concurrentQueue.TryDequeue(out number))
                        {
                            _rabbitMQ.PublishMessages(
                                Encoding.ASCII.GetBytes(number.ToString()),
                                "test"
                            );
                        }
                    } else
                    {
                        Thread.Sleep(1);
                    }
                }
            });  

is really not a good idea.

First off, I would do that in the service class that handles sending (serviceInstance1 - don't know the type). Then, you are doing a tight loop with spin-wait while mixing TPL with Thread.Sleep. That are 2 NoNos. It also completely messes the intention of the blocking queue. Which is: the thread blocks until an item is available on that queue.

Maybe it's a better idea to drop this part for now, completely and just have the final block in the pipeline do the serviceInstance1.Publish. That may have been premature optimization.

EDIT 2

So, yesterday I did some experimenting and found that this:

using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml.Linq;
using System.Xml.Serialization;
using Newtonsoft.Json;
using System.Linq;

namespace DataFlowExperiment.PipelinesLib
{

    public class PipelineOne
    {
        private readonly IPipelineOneSteps steps;

        private readonly TransformBlock<string, XDocument> startBlock; // XML deserialize to Model
        private readonly TransformManyBlock<XDocument, string> toJsonMessagesBlock; // jsons generieren.
        private readonly ITargetBlock<string> resultCallback;

        public PipelineOne(IPipelineOneSteps steps, ITargetBlock<string> resultCallback = null)
        {
            this.steps = steps;

            startBlock = new TransformBlock<string, XDocument>(steps.Start);
            toJsonMessagesBlock = new TransformManyBlock<XDocument, string>(steps.ToJson);

            this.resultCallback = resultCallback ?? DataflowBlock.NullTarget<string>();

            startBlock.LinkTo(toJsonMessagesBlock, new DataflowLinkOptions { PropagateCompletion = true });
            toJsonMessagesBlock.LinkTo(this.resultCallback, new DataflowLinkOptions { PropagateCompletion = true }, x => !string.IsNullOrEmpty(x));
            toJsonMessagesBlock.LinkTo(DataflowBlock.NullTarget<string>(), new DataflowLinkOptions { PropagateCompletion = true });
        }

        public void Post(string input)
        {
            startBlock.Post(input);
        }

        public Task Close()
        {
            startBlock.Complete();
            return resultCallback.Completion;
        }
    }

    public interface IPipelineOneSteps
    {
        public XDocument Start(string input);
        public IEnumerable<string> ToJson(XDocument doc);
    }

    public class PipelineOneSteps : IPipelineOneSteps
    {
        private readonly JsonSerializer jsonSerializer;

        public PipelineOneSteps()
        {
            jsonSerializer = JsonSerializer.CreateDefault();
        }

        public XDocument Start(string input)
        {
            XDocument doc = XDocument.Parse(input);
            return doc;
        }

        public IEnumerable<string> ToJson(XDocument doc)
        {
            XNamespace ns = "api-com";
            var orders = doc.Root.Elements(ns + "ORDER");

            foreach (var order in orders)
            {
                yield return JsonConvert.SerializeXNode(order);
            }
        }
    }
}

results in this benchmark:


BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19041.867 (2004/?/20H1)
Intel Core i9-10885H CPU 2.40GHz, 1 CPU, 16 logical and 8 physical cores
.NET Core SDK=5.0.202
  [Host]     : .NET Core 3.1.14 (CoreCLR 4.700.21.16201, CoreFX 4.700.21.16208), X64 RyuJIT
  DefaultJob : .NET Core 3.1.14 (CoreCLR 4.700.21.16201, CoreFX 4.700.21.16208), X64 RyuJIT


Method N Mean Error StdDev Gen 0 Gen 1 Gen 2 Allocated
PipeLineOneBenchmark 1000 25.00 μs 0.269 μs 0.252 μs - - - -
PipeLineOneBenchmark 100000 2,491.42 μs 13.655 μs 15.177 μs - - - -

Which is similar but different from your solution.

Nevertheless, it makes me think, that the actual problem is elsewhere. (Still working on it and going to update.)

I was thinking of a little tool to see if your rabbit is simply too slow and you have substantial build-up:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace DataFlowExperiment.PipelinesLib
{
    public class RabbitWrapper : IDisposable
    {
        private readonly int batchSize = 10;

        private Thread senderThread;
        private readonly BlockingCollection<string> messages;
        private readonly ActionBlock<string> receiver;
        private readonly CancellationTokenSource stoppingToken;
        private readonly RabbitWrapperStats stats;

        private ITargetBlock<string> Receiver => receiver;

        public RabbitWrapper()
        {
                                                                // Drop in your logging here
            stats = new RabbitWrapperStats(new Progress<string>(x => Console.WriteLine(x)));
            stoppingToken = new CancellationTokenSource();
            messages = new BlockingCollection<string>();
            receiver = new ActionBlock<string>(Receive);
            senderThread = new Thread(HandleQueue);
            senderThread.Start();
        }

        private void Receive(string message)
        {
            messages.Add(message);
        }

        private void HandleQueue()
        {
            while (!stoppingToken.Token.IsCancellationRequested)
            {
                int batchIndex = 0;
                do {
                    string message = messages.Take(stoppingToken.Token);
                    if (!string.IsNullOrEmpty(message))
                    {
                        SendToRabbit(message);
                    }
                    batchIndex++;
                } while (!stoppingToken.Token.IsCancellationRequested &&
                         batchIndex < batchSize &&
                         messages.Count > 0);
                // Check statistics every 10 messages.
                CheckStats(messages.Count);
            }
        }

        private void SendToRabbit(string message)
        {
            // rabbit Publish goes here.
        }

        private void CheckStats(int count)
        {
            stats.CheckStats(count);
        }

        public void Close()
        {
            this.stoppingToken.Cancel();
            senderThread.Join();
        }

        public void Dispose()
        {
            Close();
        }
    }

    internal class RabbitWrapperStats
    {
        // You may want to play around with these thresholds
        // I pulled them out of thin air ...
        const int SIZE_WARN = 500000;
        const int SIZE_CRITICAL = SIZE_WARN * 2;

        private int lastTenIndex = 0;
        private int[] lastTen = new int[10] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
        private int meanSizeLastTen = 0;
        private int lastMeanSize = 0;
        private int tendency = 0;

        private bool HasWarned = false;
        private bool HasPanicked = false;

        private readonly IProgress<string> progress;

        public RabbitWrapperStats(IProgress<string> progress)
        {
            this.progress = progress;
        }

        public void CheckStats(int queueSize)
        {
            UpdateLastTen(queueSize);

            if (!HasPanicked && queueSize > SIZE_CRITICAL)
            {
                Panic(queueSize);
                return;
            }

            if (!HasWarned && queueSize > SIZE_WARN)
            {
                Warn(queueSize);
                return;
            }

            if ((HasPanicked || HasWarned ) && meanSizeLastTen < SIZE_WARN * 0.75)
            {
                HasPanicked = false;
                HasWarned = false;
                progress?.Report($"INFO Mean size of last 10 Samples sinks below {SIZE_WARN * 0.75} : {meanSizeLastTen}");
            }
        }

        private void Warn(int size)
        {
            HasWarned = true;
            progress?.Report($"WARNING QueueSize = {size}");
        }

        private void Panic(int size)
        {
            HasPanicked = true;
            progress?.Report($"!! CRITICAL !! QueueSize = {size}");
        }

        private void UpdateLastTen(int value)
        {
            lastTen[lastTenIndex] = value;
            lastTenIndex = ++lastTenIndex % lastTen.Length;
            meanSizeLastTen = lastTen.Sum() / lastTen.Length;
            tendency = meanSizeLastTen.CompareTo(lastMeanSize);
            lastMeanSize = meanSizeLastTen;
        }
    }
}
Fildor
  • 14,510
  • 4
  • 35
  • 67
  • @R13mus This is an implementation of what I was talking about. With each block (stage, or whatever you want to call it) doing only what it needs to as fast as it can, increases the parallelism. It's counterintuitive, but this can often decrease the memory pressure because the resources aren't shared and can be reclaimed apace. It's also easier to reason about each block separately, so it may lead to an aha moment if you're actually leaking memory somewhere. – Kit Apr 26 '21 at 17:08
  • @Fildor - thanks ! I'm currently trying to implement your solution. So, to summarize a bit i have part one of the process - transforming and publishing orders into a BlockingCollection using the above pipelines and part two is to consume from that Collection using a dedicated thread which will then send data to RabbitMQ, right ? – R13mus Apr 27 '21 at 05:49
  • That is what I would try, yes. It just may not be enough to solve all of the problems. But it will enable you to tweak each step, so you can further improve memory usage. – Fildor Apr 27 '21 at 06:00
  • The last part of the pipeline does not work (i.e. TransformManyBlock -> ActionBlock). The last method (SendMessage()) does not print anything. Any clues how to fix this? I edited my post with the latest version of the code. – R13mus Apr 27 '21 at 06:22
  • I'll add some more thoughts to the answer, but will have not much time today, so please bear with me. – Fildor Apr 27 '21 at 06:27
  • _"The last part of the pipeline does not work"_ - I think it's a type mismatch. If your second last block output is `IEnumerable` then the input of your last block needs to match (== be also `IEnumerable`. – Fildor Apr 27 '21 at 06:32
  • I do understand that but what I want is to get a JObject as an output (from the JObject which contains a JArray as the input) from the TransformManyBlock and this should be the input of my ActionBlock. I edited my EDIT 3 section with the code. But still I don't get anything printer to the console in my ActionBlock method ... – R13mus Apr 27 '21 at 06:55
  • So, this does not work - jsonOrderFactory = new TransformManyBlock((Func)CreateOrderMessages); as I get the following exception : Argument 1: cannot convert from 'System.Func' to 'System.Func> – R13mus Apr 27 '21 at 06:59
  • ok, made it working finally (the pipelines) but i still have memory problems and now they are worst :( please see the EDIT 3 section for the latest code – R13mus Apr 27 '21 at 09:28
  • Solved the part of the new Thread which consumes from my concurrentQueue and published to RMQ but I still have memory problems ... I will post it in my question above. – R13mus Apr 27 '21 at 10:24
  • The part with the pipelines is not the problem, as I do not have memory problems if I just create the pipelines with all the logic (XML -> JSON) and just discard the messages in the ActionBlock. It's the part which publishes to RabbitMQ which is the issue :( – R13mus Apr 27 '21 at 10:36
  • I dropped the second part with the Thread and it does not work, i still have the same problem, publishing to RabbitMQ (i.e. serviceInstance1.Publish) still has memory leaks on the long term ... – R13mus Apr 27 '21 at 13:09
  • Doing some benchmarks right now and I am starting to think, this is something completely different ... – Fildor Apr 27 '21 at 16:14
  • @R13mus Updated answer, if you want to have a look. Still not a final solution, but maybe a step. – Fildor Apr 28 '21 at 06:48
  • I have some updates. First one is that I changed the XML parsing and conversion to JSON and used System.Xml.XmlSerializer to serialize the XML to objects. The problem with XmlDocument was that it was loading in memory all the objects and this was causing a big problem (I will create an update 4 to post my new code). Second thing is if I print something like a normal string to RMQ "test" i get big publishing rates (like 1750m/s). If I publish entire orders it only publishes at 120 m/s. One question which was unclear to me - when do i have to call startBlock.Complete() ? Thanks ! – R13mus Apr 28 '21 at 08:46
  • "Complete()" is kind of a "Poison". It basically tells the block to no longer accept input. So, it's for shutting down. See also [IDataflowBlock.Complete Method](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.idataflowblock.complete?view=net-5.0) – Fildor Apr 28 '21 at 08:48
  • _"If I publish entire orders it only publishes at 120 m/s_" That would explain build-up. Now, my approach would be a) can it be sped-up? and/or b) can it be scaled up? – Fildor Apr 28 '21 at 08:52
  • 1
    So I don't have to call Complete () as I want my app to run for days, weeks, months if possible. Or maybe when I dispose the service. I did put my answer up there EDIT 4. I need to try your EDIT 2 solution :) Big thanks for your patience and help ! – R13mus Apr 28 '21 at 09:25
  • Exactly. Just use it to shutdown garcefully. – Fildor Apr 28 '21 at 09:25
  • 1
    Finally managed to solve the problem thanks to your help ! The pipelines helped a lot plus I switched back to using System.Text.Json instead of Newtonsoft.Json. I also reduced the length of the messages and did a bit more filtering and now I have a working solution. Cheers !! – R13mus Apr 28 '21 at 19:18