Background: I'm working on an ASP.NET (classic) WebAPI that makes use of a data-set retrieved from an external web service to service its requests.
- The data-set is immutable
- The data-set reasonably expensive to fetch from the external web service (could take 5-10 seconds).
- Once the data-set has been fetched, accessing the data needs to be fast
- The data-set needs to be periodically updated (replaced) in the background.
- The periodic updates must not affect the performance of code that's reading the data-set
- It's not a problem if different threads see versions of the immutable data-set however any given call to
IDataSetProvider.GetLatestAsync
must return the most recently fetched data-set - It's not a problem if calls to
IDataSetProvider.GetLatestAsync
take a while when the AppPool is started/restarted (i.e. data not fetched yet) however subsequent calls should be essentially instant.
Implementation Summary:
- A
DataSetProvider
class that holds a reference to the "latest" data-set value & exposes an interface for consuming code to access the latest value. This is registered as a single-instance in the IoC container - A worker that starts a long-running task responsible for updating the "latest" data-set value on the
DataSetProvider
. This worker is again single-instance in the IoC container and is automatically activated (using Ninject here)
Questions:
- I don't think I need to use a semaphore to protect access to the
_latestDataSet
field because updates to references in C# are atomic & it's only ever going to be updated by one thread at a time (could be different threads due toasync
continuations). Am I correct about this? - Do I need the
volatile
keyword on_latestDataSet
to ensure the thread callingIDataSetProvider.GetLatestAsync
gets the latest value (and doesn't, for example, getnull
forever due to JITing optimizations or whatever)
Implementation (unimportant details elided for brevity):
public interface IDataSetProvider
{
Task<DataSet> GetLatestAsync();
}
public interface IDataSetUpdater
{
void Update(DataSet latestDataSet);
}
public class DataSetProvider : IDataSetProvider, IDataSetUpdater
{
private static readonly TimeSpan InitialFetchTimeout = TimeSpan.FromSeconds(20);
private static readonly TimeSpan InitialDataPollingInterval = TimeSpan.FromMilliseconds(100);
private volatile DataSet _latestDataSet;
public async Task<DataSet> GetLatestAsync()
{
TimeSpan elapsed = TimeSpan.Zero;
// Updated by DataSetProviderWorker task. Only null briefly on application pool startup.
while (_latestDataSet == null)
{
if (elapsed >= InitialFetchTimeout)
{
throw new InvalidOperationException($"Data has not been populated & timeout ({InitialFetchTimeout}) reached.");
}
await Task.Delay(InitialDataPollingInterval);
elapsed += InitialDataPollingInterval;
}
return _latestDataSet;
}
public void Update(DataSet latestDataSet)
{
_latestDataSet = latestDataSet;
}
}
public class DataSetProviderWorker : IStartable
{
private static readonly TimeSpan Period = TimeSpan.FromSeconds(20);
private readonly IDataSetUpdater _dataSetUpdater;
private readonly CancellationTokenSource _cancellationTokenSource;
public void Start()
{
new TaskFactory().StartNew(RunAsync, _cancellationTokenSource.Token, TaskCreationOptions.LongRunning);
}
private async Task RunAsync(object obj)
{
while (!_cancellationTokenSource.IsCancellationRequested)
{
try
{
await DoWorkAsync().ConfigureAwait(false);
}
catch (System.Exception e)
{
_logger.LogError(e, $"{nameof(DataSetProviderWorker)} encountered an unhandled exception in the execution loop. Will retry in {Period}");
}
// Delay execution even if DoWorkAsync threw an exception. Could be a transient error, in which case we don't want to spam
await Task.Delay(Period).ConfigureAwait(false);
}
}
private async Task DoWorkAsync()
{
var immutableData = await _externalDataSource.FetchData();
var latestDataSet = new DataSet(immutableData);
_dataSetUpdater.Update(latestDataSet);
}
public void Stop()
{
_cancellationTokenSource.Cancel();
}
}