I was wondering if someone can please help with the following situation:
I cannot solve a memory leak with a RabbitMQ Publisher written in C# and using .Net core 5.0.
This is the csproj file :
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<RuntimeIdentifier>win-x64</RuntimeIdentifier>
</PropertyGroup>
...
</Project>
I have a .net console application running inside a virtual machine which connects to a server through an API (a registered 64 bit dll and referenced as a COM reference), gets information from that server and then tries to publish this information to a RabbitMQ machines located on an AWS cloud (a load balancer with several nodes for this RMQ instance).
Accessing the API in the code is done in the following way :
private void SetUpApi () {
Utils.log.Info (_api.SetAPIOptions ("<CONNECTIONOPTIONS><CALCULATED_PRICES Enabled='true' MaximumDepth='4'/></CONNECTIONOPTIONS>"));
_api.OnServerConnect += OnServerConnect;
_api.OnServerDisconnect += OnServerDisconnect;
_api.OnNewData += OnNewData;
}
private void OnNewData(string strXML){
try{
if (strXML.Contains("<ORDER")){
ParseXMLAnswer(strXML, "OnNewData ()");
}
}
catch (Exception ex) {
if (ex.InnerException is AlreadyClosedException || ex.InnerException is BrokerUnreachableException)
Utils.log.Error("OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException ");
else
Utils.printException("OnNewData ()", ex);
}
}
private void ParseXMLAnswer(string strOutputXML, string caller) {
XmlDocument doc = new XmlDocument();
doc.LoadXml(strOutputXML);
string jsonText = JsonConvert.SerializeXmlNode(doc);
var o = JObject.Parse(jsonText);
if (o["APIDATA"]["ORDER"].Type is JTokenType.Object){
JObject order = (JObject)o["APIDATA"]["ORDER"];
SendOrderToRMQ(order);
}
else if (o["APIDATA"]["ORDER"].Type is JTokenType.Array){
JArray orders = (JArray)o["APIDATA"]["ORDER"];
foreach (var item in orders.Children()){
SendOrderToRMQ((JObject)item);
}
}
doc = null;
GC.Collect();
GC.WaitForPendingFinalizers();
}
private void SendOrderToRMQ (JObject order){
JObject instrSpeciefier = (JObject) order["INSTSPECIFIER"];
var firstSeqID = instrSpeciefier.GetValue("@FirstSequenceID").ToString();
var firstSeqItemID = instrSpeciefier.GetValue("@FirstSequenceItemID").ToString();
if (sequenceItemsHashed.ContainsKey(firstSeqID) &&
sequenceItemsHashed[firstSeqID].Contains(firstSeqItemID)){
string itemName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@FirstSequenceItemName").ToString());
string instrumentName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@InstName").ToString());
int index = sequenceItemsHashed[firstSeqID].IndexOf(firstSeqItemID) + 1;
var binding = instrumentName + "." + sequencesFromInstruments[firstSeqID] + "." + itemName + "." + index;
serviceInstance1.Publish(
order.ToString(),
_exchangeName,
"",
binding);
}
order = null;
instrSpeciefier = null;
}
During peak business hours I get around 400 - 500 messages/second from the API. These messages come in the form of XML messages. For example, a message can contain several orders as shown in the example below. One element can contain an action to insert (to create) an Order and another to remove a certain Order.
<?xml version="1.0" encoding="utf-16"?>
<APIDATA xmlns="api-com">
<ORDER EngineID="0" PersistentOrderID="2791" ...>
<INSTSPECIFIER InstID="287" ... />
...
</ORDER>
<ORDER EngineID="0" PersistentOrderID="9840" ...>
<INSTSPECIFIER InstID="288" ... />
...
</ORDER>
The RabbitMQ server is configured and I connect to it using SSL (with a certificate and a key for it). I use to connect to the RMQ the RabbitMQ.Client v6.2.1. The exchange, the queues and the bindings are already defined in RabbitMQ. My Producer application only connects to it and starts publishing.
It is important that I use a synchronous publishing method as the order of messages we get is very important. For example in one message we get an action to create an Order and another message which comes immediately after which is telling to remove the same Order. If I would use an async method to publish to RMQ I would be possibly get the removal action before the insert action.
<?xml version="1.0" encoding="utf-16"?>
<APIDATA xmlns="api-com">
<ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Insert" ...>
...
</ORDER>
The removal messages:
<?xml version="1.0" encoding="utf-16"?>
<APIDATA xmlns="api-com">
<ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Remove" ...>
...
</ORDER>
I use the following method for publishing to RMQ: the object pool (Microsoft provides a package named Microsoft.Extensions.ObjectPool) - method described in here - https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/ .
I'm using here the following code:
class RabbitManager : IRabbitManager
{
private readonly DefaultObjectPool<IModel> _objectPool;
public RabbitManager(IPooledObjectPolicy<IModel> objectPolicy){
_objectPool = new DefaultObjectPool<IModel>(objectPolicy, Environment.ProcessorCount * 2);
}
public void Publish<T>(T message, string exchangeName, string exchangeType, string routeKey) where T : class {
if (message == null)
return;
var channel = _objectPool.Get();
try{
var sendBytes = Encoding.UTF8.GetBytes(message.ToString());
var properties = channel.CreateBasicProperties();
properties.ContentType = "application/json";
properties.DeliveryMode = 1; // Doesn't persist to disk
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
channel.BasicPublish(exchangeName, routeKey, properties, sendBytes);
}
catch (Exception ex) {
throw ex;
}
finally {
_objectPool.Return(channel);
}
}
}
public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>
{
private readonly RabbitOptions _options;
private readonly IConnection _connection;
public RabbitModelPooledObjectPolicy(RabbitOptions _options){
this._options = _options;
_connection = GetConnection();
}
private IConnection GetConnection() {
var factory = new ConnectionFactory() {
HostName = _options.HostName,
UserName = _options.UserName,
Password = _options.Password,
//Port = _options.Port,
VirtualHost = _options.VHost,
};
if (!String.IsNullOrEmpty(_options.CertPath))
{
factory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
factory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
factory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
factory.Ssl.ServerName = _options.HostName;
factory.Ssl.CertPath = _options.CertPath;
factory.Ssl.CertPassphrase = _options.CertPass;
factory.Ssl.Version = SslProtocols.Tls12;
factory.Ssl.Enabled = true;
}
factory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
factory.AutomaticRecoveryEnabled = true; // enable automatic connection recovery
factory.RequestedChannelMax = 32;
var _connection = factory.CreateConnection();
_connection.ConnectionShutdown += Connection_ConnectionShutdown;
return _connection;
}
private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
Utils.log.Info("Connection broke!");
}
private bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors){
return true;
}
public IModel Create(){
return _connection.CreateModel();
}
public bool Return(IModel obj) {
if (obj.IsOpen) {
return true;
}
else {
obj?.Dispose();
return false;
}
}
}
Below is a screenshot with the problem - the constant memory inscrease :
This is the stack trace of a memory snapshot taken just after the above screenshot :
Just after the screenshot above was taken I got the following error message in my program, in the console :
26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348
26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348
And this leads to the final crash of the program :
What I did to prevent the memory leak :
- The classes have the IDisposable interfaces enabled
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
// free managed resources
_onTimePerHour.Dispose();
_api.OnNewData -= OnNewData;
}
// free native resources if there are any.
}
- Forced garbage collection after each message received :
private void ParseXMLAnswer(string strOutputXML, string caller) {
...
doc = null;
GC.Collect();
GC.WaitForPendingFinalizers();
}
This helps a bit more and now the memory problem i have increases after a longer period of time.
- I use the ReShaper (enter link description here) plugin for Visual Studio to give me a better understanding of the Stack Trace of the memory problem but it does not help much.
What do I think the problem is :
The RabbitMQ Producer app gets many messages per second which are then parsed, split into several JSON messages and send to RMQ using the same channel. The following cases might occur :
- I'm publishing on a single RMQ channel and somehow i should use several channels (one connection but multiple channels)
- I'm receiving more messages than I can parse and send through RMQ using the RabbitMQ.Client .net library
- I'm holding up some references in memory for some objects (maybe messages) which do not get freed up;
Did someone had this problem before ? Because I cannot find anywhere any info on this "SingleProducerSingleConsumerQueue+Segment<Memory> out of memory" issue.
Does someone know how to analyse more in depth this problem ?
Big thanks !
Edit 1
I guess more info is needed to solve this memory issue.
I have several consumers which consume data from RabbitMQ (like NodeJS and python apps). Thus, I need to design the RabbitMQ Producer in a generic way, as each consumer needs different data. And I cannot modify and restart my RabbitMQ Producer each time I have a new consumer app. So I need to publish my messages in a generic way.
For example, each consumer has its own dedicated queue, with dedicated bindings. Let's say I have consumer1 with queue cons1 and bindings :
- marketName.productName.*.1 (the productName would correspond for days).
This binding is dynamic and for now it corresponds to Monday (04.April) but tomorrow it will correspond to Tueday (05.April).
So I need to store in memory the marketNames and productNames by using
private static read-only Dictionary<string, List<string>> sequenceItemsHashed = new Dictionary<string, List<string>>();
private static readonly Dictionary<string, string> sequencesFromInstruments = new Dictionary<string, string>();
I mention that sequenceItemsHashed corrrespond to marketNames and sequencesFromInstruments to productNames in my logic.
This way I send all the messages to RMQ and I do the sorting in RMQ afterwards using bindings.
Edit 2
From what I understood, in order to solve my question I need something like the following architecture (enter link description here) :
So multiple threads in my single connection to the RMQ server and one channel per thread.
Edit 3
Implemented successfully the pipelines, the ConcurrentQueue and the Consumer Thread which pushes to RMQ but still have memory issues:
private readonly TransformBlock<string, string> orderFilter;
private readonly TransformBlock<string, JObject> xmlParser;
//private readonly TransformBlock<XmlDocument, JObject> xmlToJsonTransformer;
private readonly TransformManyBlock<JObject, JToken> jsonOrderFactory;
private readonly ActionBlock<JToken> messageSender;
ConcurrentQueue<JToken> concurrentQueue = new ConcurrentQueue<JToken>();
public Service1 (string [] args) {
...
// setup pipeline blocks
orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
xmlParser = new TransformBlock<string, JObject>(ParseXml);
jsonOrderFactory = new TransformManyBlock<JObject, JToken>(CreateOrderMessages);
messageSender = new ActionBlock<JToken>(SendMessage);
// build your pipeline
orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs
xmlParser.LinkTo(jsonOrderFactory);
jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });
Task t2 = Task.Factory.StartNew(() =>
{
while (true) {
if (!concurrentQueue.IsEmpty)
{
JToken number;
while (concurrentQueue.TryDequeue(out number))
{
_rabbitMQ.PublishMessages(
Encoding.ASCII.GetBytes(number.ToString()),
"test"
);
}
} else
{
Thread.Sleep(1);
}
}
});
...
}
private string FilterIncomingMessages(string strXml){
if (strXml.Contains("<ORDER")) return strXml;
return null;
}
private JObject ParseXml(string strXml){
XmlDocument doc = new XmlDocument();
doc.LoadXml(strXml);
string jsonText = JsonConvert.SerializeXmlNode(doc);
var o = JObject.Parse(jsonText);
return o;
}
private IEnumerable<JToken> CreateOrderMessages(JObject o){
List<JToken> myList = new List<JToken>();
if (o.ContainsKey("GV8APIDATA")){
if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Object){
JToken order = o["GV8APIDATA"]["ORDER"];
myList.Add(order);
}
else if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Array){
JToken orders = o["GV8APIDATA"]["ORDER"];
foreach (var order in orders.Children()){
myList.Add(order);
}
}
}
return myList.ToArray ();
}
private void SendMessage(JToken order){
concurrentQueue.Enqueue(order);
}
The new solution helps to break the logic into several small parts but I still have a constant memory increase.
Edit 4
Taking into account @Fildor's answer I did the following :
Instead of converting strings containing xml with <ORDER ...> elements to JSON, I deserialize XML to objects using the pipelines and the code below.
I removed the part with the Thread and the ConcurrentQueue and I'm publishing directly in the last ActionBlock.
This solves my memory leak problem, but there are other problems like :
- If the messages are big enough I will only be able to print around 120 messages / second. I get the rate of 1780 messages/s if I just print the simple string "test".
public Service1 (string [] args) {
...
// setup pipeline blocks
orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
xmlParser = new TransformBlock<string, OrdersResponse>(ParseXml);
jsonOrderFactory = new TransformManyBlock<OrdersResponse, Order>(CreateOrderMessages);
messageSender = new ActionBlock<Order>(SendMessage);
// build your pipeline
orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs
xmlParser.LinkTo(jsonOrderFactory);
jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });
RunAsConsole(args);
}
private readonly TransformBlock<string, string> orderFilter;
private readonly TransformBlock<string, OrdersResponse> xmlParser;
private readonly TransformManyBlock<OrdersResponse, Order> jsonOrderFactory;
private readonly ActionBlock<Order> messageSender;
private void OnNewData(string strXML){
orderFilter.Post(strXML);
}
private string FilterIncomingMessages(string strXml){
if (strXml.Contains("<ORDER")) return strXml;
return null;
}
private OrdersResponse ParseXml(string strXml) {
var rootDataObj = DeserializeOrdersFromXML(strXml);
return rootDataObj;
}
private OrdersResponse DeserializeOrdersFromXML(string strOutputXML){
var xsExpirations = new XmlSerializer(typeof(OrdersResponse));
OrdersResponse rootDataObj = null;
using (TextReader reader = new StringReader(strOutputXML)) {
rootDataObj = (OrdersResponse)xsExpirations.Deserialize(reader);
reader.Close();
}
return rootDataObj;
}
private IEnumerable<Order> CreateOrderMessages(OrdersResponse o){
return o.orders;
}
private void SendMessage(Order order) {
_rabbitMQ.PublishMessages(
Encoding.ASCII.GetBytes(order.ToString()),
"test"
);
}
And the ORDER object looking like :
[Serializable()]
[XmlRoot (ElementName = "ORDER")]
public class Order : IDisposable {
public void Dispose()
{
EngineID = null;
PersistentOrderID = null;
...
InstrumentSpecifier.Dispose();
InstrumentSpecifier = null;
GC.SuppressFinalize(this);
}
[XmlAttribute (AttributeName = "EngineID")]
public string EngineID { get; set; }
[XmlAttribute (AttributeName = "PersistentOrderID")]
public string PersistentOrderID { get; set; }
...
[XmlElement(ElementName = "INSTSPECIFIER")]
public InstrumentSpecifier InstrumentSpecifier { get; set; }
}
And my new RabbitMQ class :
public class RMQ : IDisposable {
private IConnection _connection;
public IModel Channel { get; private set; }
private readonly ConnectionFactory _connectionFactory;
private readonly string _exchangeName;
public RMQ (RabbitOptions _rabbitOptions){
try{
// _connectionFactory initialization
_connectionFactory = new ConnectionFactory()
{
HostName = _rabbitOptions.HostName,
UserName = _rabbitOptions.UserName,
Password = _rabbitOptions.Password,
VirtualHost = _rabbitOptions.VHost,
};
this._exchangeName = _rabbitOptions.ExchangeName;
if (!String.IsNullOrEmpty(_rabbitOptions.CertPath)){
_connectionFactory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
_connectionFactory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
_connectionFactory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
_connectionFactory.Ssl.ServerName = _rabbitOptions.HostName;
_connectionFactory.Ssl.CertPath = _rabbitOptions.CertPath;
_connectionFactory.Ssl.CertPassphrase = _rabbitOptions.CertPass;
_connectionFactory.Ssl.Version = SslProtocols.Tls12;
_connectionFactory.Ssl.Enabled = true;
}
_connectionFactory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
_connectionFactory.AutomaticRecoveryEnabled = true; // enable automatic connection recovery
//_connectionFactory.RequestedChannelMax = 10;
if (_connection == null || _connection.IsOpen == false){
_connection = _connectionFactory.CreateConnection();
_connection.ConnectionShutdown += Connection_ConnectionShutdown;
}
if (Channel == null || Channel.IsOpen == false){
Channel = _connection.CreateModel();
}
Utils.log.Info("ConnectToRabbitMQ () Connecting to RabbitMQ. rabbitMQenvironment = ");
}
catch (Exception ex){
Utils.log.Error("Connection to RabbitMQ failed ! HostName = " + _rabbitOptions.HostName + " VirtualHost = " + _rabbitOptions.VHost);
Utils.printException("ConnectToRMQ ()", ex);
}
}
private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
Utils.log.Info ("Connection broke!");
try{
if (ReconnectToRMQ()){
Utils.log.Info("Connected!");
}
}
catch (Exception ex){
Utils.log.Info("Connect failed!" + ex.Message);
}
}
private bool ReconnectToRMQ(){
if (_connection == null || _connection.IsOpen == false){
_connection = _connectionFactory.CreateConnection();
_connection.ConnectionShutdown += Connection_ConnectionShutdown;
}
if (Channel == null || Channel.IsOpen == false){
Channel = _connection.CreateModel();
return true;
}
return false;
}
private bool ValidateServerCertificate (object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) {
return true;
}
public void DisconnectFromRMQ () {
Channel.Close ();
_connection.Close ();
}
public void Dispose(){
try{
Channel?.Close();
Channel?.Dispose();
Channel = null;
_connection?.Close();
_connection?.Dispose();
_connection = null;
}
catch (Exception e){
Utils.log.Error("Cannot dispose RabbitMQ channel or connection" + e.Message);
}
}
public void PublishMessages (byte [] message, string routingKey) {
if (this._connection == null || ! _connection.IsOpen) {
Utils.log.Error ("PublishMessages(), Connect failed! this.conn == null || !conn.IsOpen ");
ReconnectToRMQ();
} else {
var properties = Channel.CreateBasicProperties();
properties.Persistent = true;
Channel.BasicPublish (_exchangeName, routingKey, properties, message);
//serviceInstance1.Publish(message, _rabbitOptions.ExchangeName, "", routingKey);
}
}
}
Now the funny thing is if I publish only a small string like "test" to RabbitMQ to my predefined queue I can publish more than 1780 messages/second.