2

Say I have an Observable<LogEntry>, where LogEntry has a property TimeStamp (among others of course). The LogEntrys are created by reading a logfile.

public IObservable<LogEntry> GetLogRecords()
{
    return Observable.Create<LogEntry>(
    (IObserver<LogEntry> observer) =>
    {
        var lines = File.ReadLines(this.filePath);
        var enumerator = lines.GetEnumerator();

        while (enumerator.MoveNext())
        {
            string line = enumerator.Current;

            if (!line.StartsWith("#"))
            {
                var entry = new LogEntry(line);
                observer.OnNext(entry);
            }
        }

        observer.OnCompleted();
        return Disposable.Create(() => Console.WriteLine("Unsubscribed"));

    });
}

That Observable will 'fire' as fast as the file is read.

What I want is to space (delay) the events by the time span between the second last and the last LogEntry.TimeStamp

eg. something along the line

var replayObs = GetLogEntries().Delay(calculatedTimeDiff);
codeclash
  • 2,053
  • 19
  • 17
  • Do you read the whole file each time something is written to the file? The performance will degrade pretty fast... – Dialecticus Sep 17 '14 at 10:01
  • No, that's just for demo purpose. Before "tailing" a production log file, I want to test my stuff "offline", hence replaying an archived log file. – codeclash Sep 17 '14 at 10:09
  • This is really a duplicate of http://stackoverflow.com/questions/22255205/observable-from-list-of-timetamps/22256800#22256800 – James World Sep 17 '14 at 14:15

2 Answers2

3

Create a regular enumerable over your log file events (which could be lazy or not as you desire), and then see Observable from list of timetamps. This is practically a duplicate question so I have marked it as such.

Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • And to think I forgot all about `Generate` even though it bit me before :/ I really have problems thinking with Reactive Extensions – samy Sep 17 '14 at 14:58
0

I think you have to combine observables and be explicit for all calls to do it. First let's say you logentry contains some time info. I will let you see how to calculate the delay till the next entry, but for now lets assume your logentry contains the delay directly. A good way to find his delay would be to take the logfile first entry as a relative starting point.

public class LogEntry
{
    public int delay { get; set; }
}

public static LogEntry readLogEntryFromLine(string line)
{
    return new LogEntry() {delay = Int32.Parse(line)};
}

We setup some data

var lines = new string[] {"0","1","1","2","3","7"};
// this would be the lines you read from the file

var logEntry1 = readLogEntryFromLine(lines[0]) ;

What you need to do is start by creating an observable pushing the first logentries you need (let's say in your case that you want one of them; adding a second one is easy, see the Concat signature)

var firstTwoLogEntries = 
    Observable.Concat(  Observable.Return(logEntry1));

Then for each new line in your file, you create a new LogEntry that you add to the observable with the expected delay:

var finalObservable = firstTwoLogEntries;
var delayOfLatestLogEntry = logEntry1.delay;

foreach (var line in lines.Skip(1)) // we skip the filrst logentry that is our reference
{
    var logEntry = readLogEntryFromLine(line);
    finalObservable = Observable.Concat(finalObservable, Observable.Return(logEntry).Delay(TimeSpan.FromSeconds(delayOfLatestLogEntry)));
    delayOfLatestLogEntry = logEntry.delay;
}

Finally suscribe to your observable

finalObservable.Subscribe((le) => Console.WriteLine("log event at " + DateTime.Now + " - next one in " + le.delay + " seconds"));
samy
  • 14,832
  • 2
  • 54
  • 82