0

I have a microservice (Web API) within an eventdriven architecture receiving messages from RabbitMQ and it is supposed to save them into a PostgreSQL DB using ADO.NET.

Unfortunately, my connection pool (currently set to 50) gets exhausted quite fast, giving me this error message:

The connection pool has been exhausted, either raise MaxPoolSize

My RabbitMQ Consumer is set up like this (Singleton):

public class Listener : RabbitMqConnection
{
    public AsyncEventingBasicConsumer _asyncConsumer;
    private static readonly SemaphoreSlim AsyncLock = new SemaphoreSlim(1, 1);

    public Listener()
    {
        _asyncConsumer = new AsyncEventingBasicConsumer(_channel);
        _asyncConsumer.Received += ConsumerReceived;        
    }   

    public async Task ConsumerReceived(object sender, BasicDeliverEventArgs message)
    {
        await AsyncLock.WaitAsync();
        try
        {
            //Performing logic and saving into database
            //....
            using (var ctx = ContextFactory.GetContext<PostgreSqlDatabaseContext>(_connectionString))
            {
                //Creating query with StringBuilder... 
                await ctx.Database.ExecuteSqlCommandAsync(query.ToString(), parameters);
            }

            _channel.BasicAck(message.DeliveryTag, false);
        }
        catch (DecoderFallbackException decoderFallbackException)
        {
            _logger.LogError($"...");
            _channel.BasicNack(message.DeliveryTag, false, false);
        }       
        finally {
            AsyncLock.Release();
        }       
    }
}

ContextFactory

internal class ContextFactory
{
    public static T GetContext<T>(string sqlConnection) where T : DbContext
    {
        var optionsBuilder = new DbContextOptionsBuilder<PostgreSqlDatabaseContext>();
        optionsBuilder.UseNpgsql(sqlConnection);
        return new PostgreSqlDatabaseContext(optionsBuilder.Options) as T;
    }
}

RabbitMqConnection:

public abstract class RabbitMQConnection 
{
    public  IModel _channel;
    public IBasicProperties _properties;
    public AsyncEventingBasicConsumer _asyncConsumer;
    public ConnectionFactory _factory;
    public ConnectConfiguration _connectConfiguration;
    bool isConnected = false;

    public void Connect(ConnectConfiguration connectConfiguration)
    {
        if (!isConnected)
        {
            _connectConfiguration = connectConfiguration;
            CreateFactory(_connectConfiguration);
            SetupConfiguration(_connectConfiguration.Exchange);
        }
    }

    private void CreateFactory(ConnectConfiguration config)
    {
        _factory = new ConnectionFactory
        {
            AutomaticRecoveryEnabled = true,
            DispatchConsumersAsync = true,
            UseBackgroundThreadsForIO = true,
            RequestedHeartbeat = 15,
            HostName = config.Server,
            UserName = config.UserName,
            Password = config.Password
        };

        if (!string.IsNullOrWhiteSpace(config.Vhost))
            _factory.VirtualHost = config.Vhost;
    }

    private void SetupConfiguration(string exchange)
    {
         var connection = _factory.CreateConnection();
        _channel = connection.CreateModel();

        _properties = _channel.CreateBasicProperties();
        _properties.Persistent = true;

        _channel.BasicQos(0, 10, false);
        _channel.ExchangeDeclare(exchange, "topic", true);

        isConnected = true;
    }
}

I can´t not understand why I keep getting this error. Isn´t the SemaphoreSlim with WaitAsync() and Release() suppose to prevent the ConsumerReceived method from running the logic?

Marcus
  • 8,230
  • 11
  • 61
  • 88
  • 3
    When it complains about running out of *connections* then you want to count CreateConnection(), not received messages. Consider that the real problem might be that you never dispose a connection. – Hans Passant Mar 20 '18 at 12:19
  • Where's the code that *creates* connections and *disposes* them? Where is the `using(var connection=new NpgSqlConnection(…){ … }` call? ADO.NET already manages connections and ensures that opening a new one is *cheap* through the *connection pool*. When you *dispose/close* a connection it's reset and placed in the pool. If you try to manage the connections yourself and never close them a) you'll end up with hundreds of open connections and b) you'll cause a *lot* of blocking in the database, as locks and transactions will accumulate – Panagiotis Kanavos Mar 20 '18 at 12:26
  • What do you mean @HansPassant? I am not running out of _RabbitMQ connections_, I am running out of _Npgsql connections_. Isn´t the _SemaphoreSlim_ suppose to prevent the logic within between the locks to run to completion before another thread can access the same portion of the code? – Marcus Mar 20 '18 at 12:30
  • Another thread will access same portion after completion - true. But do you dispose connection to database within it? Coz if no - on 50th run of this code it will exhaust – Evgeny Gorbovoy Mar 20 '18 at 12:32
  • See update - I do dispose the db connection with using statement. – Marcus Mar 20 '18 at 12:33
  • It doesn't seem like the problem code is displayed above – theMayer Mar 20 '18 at 12:37
  • Oh may be try new SemaphoreSlim(0, 1); -just trying to guess. I worked with it a month ago, had same problem.. – Evgeny Gorbovoy Mar 20 '18 at 12:37
  • Naturally the reason is that the connection pool is exhausted, but I am using every instance of the context within a using statement. – Marcus Mar 20 '18 at 12:47
  • What is the status of the database connections when the pool is exhausted ? You can see them in pgAdmin. I would say that it may be a database lock that prevents the connections to be released. You may also try to decrease the command timeout. –  Mar 20 '18 at 13:40
  • In the `pg_stat_database`? I can see `temp_files` is 87 and `numbackends` is 13 (connection pool specified is 50). – Marcus Mar 20 '18 at 14:01
  • How about providing a simple, complete and minimal code sample that reproduces the issue without any RabbitMQ? This way we can understand exactly what you're doing. – Shay Rojansky Mar 21 '18 at 09:17

0 Answers0