0

Based on below answer, I implemented least loaded connection for StackExchange.Redis:

https://stackoverflow.com/a/58106770

public class RedisConnectionWrapper : IRedisConnectionWrapper
{
    #region Fields

    private readonly Config _config;
    private bool _disposed = false;
    private readonly Lazy<string> _connectionString;
    private static ConcurrentBag<Lazy<ConnectionMultiplexer>> _connections;
    
    #endregion

    #region Ctor

    public RedisConnectionWrapper(Config config)
    {
        _config = config;
        _connectionString = new Lazy<string>("CONNECTION_STRING");
        ConnectionMultiplexer.SetFeatureFlag("preventthreadtheft", _config.RedisPreventThreadTheft);

        if (_config.UseLeastLoadedConnection)
            InitializeLeastLoadedConnections();
    }

    #endregion

    /// <summary>
    /// Initialize lazy connections to Redis servers
    /// </summary>
    /// <returns></returns>
    private void InitializeLeastLoadedConnections()
    {
        _connections = new ConcurrentBag<Lazy<ConnectionMultiplexer>>();

        for (var i = 0; i < _config.PoolSize; i++)
        {
            var connection = ConnectionMultiplexer.Connect(_connectionString.Value);

            connection.IncludePerformanceCountersInExceptions = true;

            _connections.Add(new Lazy<ConnectionMultiplexer>(connection));
        }
    }

    /// <summary>
    /// Get least loaded connection to Redis servers
    /// </summary>
    /// <returns></returns>
    protected ConnectionMultiplexer GetLeastLoadedConnection()
    {
        Lazy<ConnectionMultiplexer> connection;

        var loadedLazys = _connections.Where(lazy => lazy.IsValueCreated && lazy.Value.IsConnected);

        if (loadedLazys.Count() == _connections.Count)
        {
            var minValue = _connections.Min(lazy => lazy.Value.GetCounters().TotalOutstanding);
            connection = _connections.First(lazy => lazy.Value.GetCounters().TotalOutstanding == minValue);
        }
        else
        {
            Console.WriteLine("Creating a new connection to Redis");
            connection = _connections.First(lazy => !lazy.IsValueCreated);
        }

        return connection.Value;
    }

    /// <summary>
    /// Release all resources associated with this object
    /// </summary>
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    // Protected implementation of Dispose pattern.
    protected virtual void Dispose(bool disposing)
    {
        if (_disposed)
            return;

        if (disposing)
        {
            if (_config.UseLeastLoadedConnection)
            {
                var activeConnections = _connections.Where(lazy => lazy.IsValueCreated).ToList();
                activeConnections.ForEach(connection => connection.Value.Dispose());
            }
        }
        _disposed = true;
    }

But I'm getting this exception on high traffic:

System.InvalidOperationException: Sequence contains no matching element at System.Linq.ThrowHelper.ThrowNoMatchException() at System.Linq.Enumerable.First[TSource](IEnumerable1 source, Func2 predicate)

Anybody can help me?

  • It might be a race condition between checking for any connections that haven't been connected and then looking for the first one that isn't connected. – stuartd Nov 03 '21 at 16:43
  • I'm checking `IsConnected` property when I'm getting lazy connections: `var loadedLazys = _connections.Where(lazy => lazy.IsValueCreated && lazy.Value.IsConnected);` @stuartd – Milad Sadri Nov 03 '21 at 16:47
  • It might help if you clarified _which_ `First()` call throws the error – stuartd Nov 04 '21 at 14:54
  • It was happening cause of the first one. I did some changes in my code and it worked fine. I described the changes in my answer below. Thanks for your responses @stuartd – Milad Sadri Nov 05 '21 at 09:09

1 Answers1

0

After some traces, I did some changes and it worked:

    var loadedLazys = _connections.Where(lazy => lazy.IsValueCreated);

    if (loadedLazys.Count() == _connections.Count)
    {
       connection = _connections.OrderBy(lazy => lazy.Value.GetCounters().TotalOutstanding).First();
    }

I changed this part of my code as well:

    private void InitializeLeastLoadedConnections()
    {
        lock (_lock)
        {
            _connections = new ConcurrentBag<Lazy<ConnectionMultiplexer>>();
            for (var i = 0; i < _config.PoolSize; i++)
            {
                _connections.Add(new Lazy<ConnectionMultiplexer>(() =>
                {
                    var connection = ConnectionMultiplexer.Connect(_connectionString.Value);
                    connection.IncludePerformanceCountersInExceptions = true;
                    return connection;
                }));
            }
        }
    }

Also I changed the return type of GetLeastLoadedConnection() from ConnectionMultiplexer to IConnectionMultiplexer.