I am implementing a caching layer for my ASP.NET Core 3.1 Web API.
Starting Implementation
public interface ICache
{
T Get<T>(string key);
void Set<T>(string key, T value);
}
public static class ICacheExtensions
{
public static T GetOrCreate<T>(this ICache cache, string key, Func<T> factory)
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = factory();
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
return value;
}
public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = await factory().ConfigureAwait(false);
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
return value;
}
}
This works fine, but one known problem I'm trying to address is that it is susceptible to cache stampedes. If my API is handling many requests that all try to access the same key using one of the GetOrCreate methods at the same time, they will each run a parallel instance of the factory function. This means redundant work and wasted resources.
What I have attempted to do is introduce mutexes to ensure that only one instance of the factory function can run per cache key.
Introduce Mutexes
public interface ICache
{
T Get<T>(string key);
void Set<T>(string key, T value);
}
public static class ICacheExtensions
{
public static T GetOrCreate<T>(this ICache cache, string key, Func<T> factory)
{
using var mutex = new Mutex(false, key);
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
mutex.WaitOne();
try
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = factory();
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
}
finally
{
mutex.ReleaseMutex();
}
}
return value;
}
public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
using var mutex = new Mutex(false, key);
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
mutex.WaitOne();
try
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = await factory().ConfigureAwait(false);
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
}
finally
{
mutex.ReleaseMutex();
}
}
return value;
}
}
This works great for GetOrCreate()
, but GetOrCreateAsync()
throws an exception. Turns out mutexes are thread-bound so if WaitOne()
and ReleaseMutex()
are called on different threads (as tends to happen with async/await), the mutex doesn't like that and throws an exception. I found this other SO question that describes some workarounds and decided to go with a custom task scheduler. SingleThreadedTaskScheduler
schedules tasks using a thread pool containing exactly one thread. And I intend to interact with the mutex only from that thread.
SingleThreadedTaskScheduler
internal sealed class SingleThreadedTaskScheduler : TaskScheduler, IDisposable
{
private readonly Thread _thread;
private BlockingCollection<Task> _tasks;
public SingleThreadedTaskScheduler()
{
_tasks = new BlockingCollection<Task>();
_thread = new Thread(() =>
{
foreach (var t in _tasks.GetConsumingEnumerable())
{
TryExecuteTask(t);
}
});
_thread.IsBackground = true;
_thread.Start();
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks.ToArray();
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
public void Dispose()
{
_tasks?.CompleteAdding();
_thread?.Join();
_tasks?.Dispose();
_tasks = null;
}
}
GetOrCreateAsync with SingleThreadedTaskScheduler
private static readonly TaskScheduler _mutexTaskScheduler = new SingleThreadedTaskScheduler();
public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
using var mutex = new Mutex(false, key);
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
await Task.Factory
.StartNew(() => mutex.WaitOne(), CancellationToken.None, TaskCreationOptions.None, _mutexTaskScheduler)
.ConfiureAwait(false);
try
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = await factory().ConfigureAwait(false);
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
}
finally
{
await Task.Factory
.StartNew(() => mutex.ReleaseMutex(), CancellationToken.None, TaskCreationOptions.None, _mutexTaskScheduler)
.ConfiureAwait(false);
}
}
return value;
}
With this implementation, the exception is resolved, but GetOrCreateAsync
still calls the factory function many times in a cache stampede scenario. Am I missing something?
I've also tried using SemaphoreSlim
instead of Mutex
which should play nicer with async/await. The issue here is that Linux doesn't support named semaphores so I'd have to keep all my semaphores in a Dictionary<string, SemaphoreSlim>
and that would be too cumbersome to manage.