I'm trying to expose an observable sequence that gives observers all existing records in a database table plus any future items. For the sake of argument, lets say it's log entries. Therefore, I'd have something like this:
public class LogService
{
private readonly Subject<LogEntry> entries;
public LogService()
{
this.entries = new Subject<LogEntry>();
this.entries
.Buffer(...)
.Subscribe(async x => WriteLogEntriesToDatabaseAsync(x));
}
public IObservable<LogEntry> Entries
{
get { return this.entries; }
}
public IObservable<LogEntry> AllLogEntries
{
get
{
// how the heck?
}
}
public void Log(string message)
{
this.entries.OnNext(new LogEntry(message));
}
private async Task<IEnumerable<LogEntry>> GetLogEntriesAsync()
{
// reads existing entries from DB table and returns them
}
private async Task WriteLogEntriesToDatabaseAsync(IList<LogEntry> entries)
{
// writes entries to the database
}
}
My initial thought for the implementation of AllLogEntries
was something like this:
return Observable.Create<LogEntry>(
async observer =>
{
var existingEntries = await this.GetLogEntriesAsync();
foreach (var existingEntry in existingEntries)
{
observer.OnNext(existingEntry);
}
return this.entries.Subscribe(observer);
});
But the problem with this is that there could log entries that have been buffered and not yet written to the database. Hence, those entries will be missed because they are not in the database and have already passed through the entries
observable.
My next thought was to separate the buffered entries from the non-buffered and use the buffered when implementing AllLogEntries
:
return Observable.Create<LogEntry>(
async observer =>
{
var existingEntries = await this.GetLogEntriesAsync();
foreach (var existingEntry in existingEntries)
{
observer.OnNext(existingEntry);
}
return this.bufferedEntries
.SelectMany(x => x)
.Subscribe(observer);
});
There are two problems with this:
- It means clients of
AllLogEntries
also have to wait for the buffer timespan to pass before they receive their log entries. I want them to see log entries instantaneously. - There is still a race condition in that log entries could be written to the database between the point at which I finish reading the existing ones and the point at which I return the future entries.
So my question is: how would I actually go about achieving my requirements here with no possibility of race conditions, and avoiding any major performance penalties?