8

I've been looking at wrapping a file watcher in an observable to aide in processing events but I'm having some trouble figuring out how to get the behaviour I want out of it. The file watcher watches a directory into which files are placed. When a file is first placed into that directory the Created event is fired on the file watcher. However if the file is large or the network connection slow then a series of Changed events are fired as the file updates. I don't want to process the file until it has finished being written so what I really need is this timeline

|Created    |Changed   |Changed   |Changed                      
________________________________________________ 
^Write starts       ^Write finishes       ^Processing Starts    

I looked at a number of methods of filtering the events in Rx but I couldn't get what I need which is "fire a function once a file file has not been changed for X seconds". Throttle is no good as it will lose events in the middle. Buffer is no good as events might occur on the buffer boundary.

I had thought about using timeouts but I wasn't crazy that they threw an exception and I wanted the processing to start as files were being written and not once there were no more events at all.

There is a similar question at Reactive Extensions vs FileSystemWatcher which was never really solved.

Is there a method such that I can do this easily? I'm sure this is not an uncommon use case.

Community
  • 1
  • 1
stimms
  • 42,945
  • 30
  • 96
  • 149

4 Answers4

3

ObservableFileSystemWatcher - an observable wrapper around the FileSystemWatcher type - works perfectly. Add a NuGet package named ReactiveFileSystemWatcher and create a console application to test as follows

class Program
{
  static void Main(string[] args)
  {
    using (var watcher = new ObservableFileSystemWatcher(c => { c.Path = @"C:\FolderToWatch\"; c.IncludeSubdirectories = true; }))
    {
      watcher.Created.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
      watcher.Changed.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
      watcher.Renamed.Select(x => $"{x.OldName} was {x.ChangeType} to {x.Name}").Subscribe(Console.WriteLine);
      watcher.Deleted.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
      watcher.Errors.Subscribe(Console.WriteLine);
      watcher.Start();
      Console.ReadLine();
    }
  }
}
zx485
  • 28,498
  • 28
  • 50
  • 59
2

EDIT: after review, don't think you want this...

Mayhap I'm oversimplifying a bit, but wouldn't Throttle be ideal here?

This is by no means "simple", but I think it does what you want closer than my previous idea:

(bonus: with a test case! ;) )

void Main()
{
    var pathToWatch = @"c:\temp\";
    var fsw = new FileSystemWatcher(pathToWatch);

    // set up observables for create and changed
    var changedObs = 
       Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
          dlgt => fsw.Changed += dlgt, 
          dlgt => fsw.Changed -= dlgt);
    var createdObs = 
       Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>( 
          dlgt => fsw.Created += dlgt, 
          dlgt => fsw.Created -= dlgt);

    // the longest we'll wait between last file write and calling it "changed"
    var maximumTimeBetweenWrites = TimeSpan.FromSeconds(1);

    // A "pulse" ticking off every 10ms (adjust this as desired)
    var timer = Observable
        .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10))
        .Select(i => DateTime.Now);

    var watcher = 
        from creation in createdObs
        from change in changedObs
            // we only care about changes matching a create
            .Where(changeEvt => changeEvt.EventArgs.Name == creation.EventArgs.Name)
            // take latest of (pulse, changes) and select (event, time since last file write)
            .CombineLatest(timer, (evt, now) => new {
                    Change = evt, 
                    DeltaFromLast = now.Subtract(new FileInfo(evt.EventArgs.FullPath).LastWriteTime)})
            // skip all until we trigger than "time before considered changed" threshold
            .SkipWhile(evt => evt.DeltaFromLast < maximumTimeBetweenWrites)
            // Then lock on that until we change a diff file
            .Distinct(evt => evt.Change.EventArgs.FullPath)
        select change.Change;

    var disp = new CompositeDisposable();

    // to show creates
    disp.Add(
        createdObs.Subscribe(
           evt => Console.WriteLine("New file:{0}", 
                evt.EventArgs.FullPath)));

    // to show "final changes"
    disp.Add(
        watcher.Subscribe(
           evt => Console.WriteLine("{0}:{1}:{2}", 
                 evt.EventArgs.Name, 
                 evt.EventArgs.ChangeType, 
                 evt.EventArgs.FullPath)));

    fsw.EnableRaisingEvents = true;

    var rnd = new Random();
    Enumerable.Range(0,10)
        .AsParallel()
        .ForAll(i => 
            {
                var filename = Path.Combine(pathToWatch, "foo" + i + ".txt");
                if(File.Exists(filename))
                    File.Delete(filename);

                foreach(var j in Enumerable.Range(0, 20))
                {
                    var writer = File.AppendText(filename);
                    writer.WriteLine(j);
                    writer.Close();
                    Thread.Sleep(rnd.Next(500));
                }
            });

    Console.WriteLine("Press enter to quit...");
    Console.ReadLine();
    disp.Dispose();        
}
JerKimball
  • 16,584
  • 3
  • 43
  • 55
  • I haven't thrown this against my unit tests but I don't think it will work because changed events are not always fired. The sequence is that you have a single Created event followed by 0 or more Changed events. I'm also not convinced that this addresses the possibility that you might have a file write which lasts longer than 5 seconds. – stimms Jan 08 '13 at 20:03
  • @stimms Actually, completely forgot I answered this...looking it over, I think this won't be what you're after...let me update this a bit... – JerKimball Jan 08 '13 at 21:43
  • @stimms ok, a new take on things - the "create" should let you start processing files as they come in, and the "final change" should signal when a file is "done" – JerKimball Jan 08 '13 at 21:53
  • Thanks so much for your effort on this. I think I finally got something that worked using the BufferWithInactivity but I've learned a ton just reading your code. – stimms Jan 09 '13 at 20:04
  • @stimms no worries, was actually kind of fun (well, maybe not fun...engaging) to write. :) – JerKimball Jan 09 '13 at 21:18
1

Take a look at my BufferWithInactivity extension method in this answer.

I think you could use it to look for inactivity in the changed events.

Community
  • 1
  • 1
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • 2
    I used the buffer with inactivity coupled with a group by function to achieve what I wanted. – stimms Jan 09 '13 at 20:03
0

Check out the NuGet package Reactive FileSystemWatcher.

The source code is on the the GitHub page.

Contango
  • 76,540
  • 58
  • 260
  • 305