2

I would like to generate an observable of files, such that the discovery of the files names could be cancelled in any moment. For the sake of this example, the cancellation takes place in 1 second automatically.

Here is my current code:

class Program
{
    static void Main()
    {
        try
        {
            RunAsync(@"\\abc\xyz").GetAwaiter().GetResult();
        }
        catch (Exception exc)
        {
            Console.Error.WriteLine(exc);
        }
        Console.Write("Press Enter to exit");
        Console.ReadLine();
    }

    private static async Task RunAsync(string path)
    {
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
        await GetFileSource(path, cts);
    }

    private static IObservable<string> GetFileSource(string path, CancellationTokenSource cts)
    {
        return Observable.Create<string>(obs => Task.Run(async () =>
        {
            Console.WriteLine("Inside Before");
            foreach (var file in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).Take(50))
            {
                cts.Token.ThrowIfCancellationRequested();
                obs.OnNext(file);
                await Task.Delay(100);
            }
            Console.WriteLine("Inside After");
            obs.OnCompleted();
            return Disposable.Empty;
        }, cts.Token))
        .Do(Console.WriteLine);
    }
}

I do not like two aspects of my implementation (if there are more - please feel free to point out):

  1. I have an enumerable of files, yet I iterate over each manually. Could I use the ToObservable extension somehow?
  2. I could not figure out how to make use of the cts.Token passed to Task.Run. Had to use the cts captured from the outer context (GetFileSource parameter). Seems ugly to me.

Is this how it should be done? Must be a better way.

mark
  • 59,016
  • 79
  • 296
  • 580
  • This doesn't seem like a terribly reactive problem since you are really just enumerating over the collection. What is causing the cancellation? Have you had a look at Parallel.ForEach or PLinq, which also support mid-iteration cancellation? – paulpdaniels Jul 21 '15 at 00:07
  • It is a stripped down example. The real logic is much more complicated. – mark Jul 21 '15 at 00:23
  • 1
    As a general rule - if you find yourself doing `return Disposable.Empty;` then you are almost certainly doing something wrong. – Enigmativity Jul 21 '15 at 03:24
  • Related: [How to cancel an observable sequence](https://stackoverflow.com/questions/6759833/how-to-cancel-an-observable-sequence) – Theodor Zoulias Dec 08 '20 at 16:13

2 Answers2

2

I would recommend that you avoid Observable.Create when you can use the other operators.

Also, when you do a return Disposable.Empty; within Observable.Create you are creating an observable that cannot be stopped by the normal Rx subscription disposable. This can lead to memory leaks and unnecessary processing.

Finally, throwing exceptions to end normal computation is a bad bad idea.

There is a good clean solution that seems to do what you want:

private static IObservable<string> GetFileSource(string path, CancellationTokenSource cts)
{
    return
        Directory
            .EnumerateFiles(path, "*", SearchOption.AllDirectories)
            .ToObservable()
            .Take(50)
            .TakeWhile(f => !cts.IsCancellationRequested);
}

The only thing that I didn't include was the Task.Delay(100);. Why are you doing that?

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • The delay is there to simulate asynchronous work that is happening for every fetched file. – mark Jul 21 '15 at 14:22
  • Your solution has a slightly different semantics, not sure yet if it is suitable for me. In the original example, the file enumerator does not proceed to the next file until certain asynchronous processing is over (simulated with Task.Delay). In your code (and please remove `Take(50)`, because this number does not have any significance) I can add `SelectMany` after `TakeWhile` to run the asynchronous post processing, but it would not be the same, because the file enumerator is going to fetch significantly more files, most of which would not be post processed because of the impending timeout. – mark Jul 21 '15 at 14:35
  • And there is another thing. If I add post processing after `TakeWhile` it runs concurrently and totally unrestricted, which is also a change in semantics from the original, where the post processing was asynchronous, but not concurrent. But suppose it is good and we do want a concurrent post processing. As it is, it would be totally unrestricted, and if I do restrict it (using Select + Merge(int)), then the call to `GetFileSource` is not going to obey the timeout, since we have fetched so many file names in the first place. Again, I have to think if it is good for me. – mark Jul 21 '15 at 14:48
2

I'm still not convinced this is really a Reactive Problem, you are asking for backpressure on the producer which is really against how Reactive is supposed to work.

That being said, if you are going to do it this way you should realize that very fine-grained time manipulation should almost always be delegated to a Scheduler rather than trying to do coordination with Tasks and CancellationTokens. So I would refactor to look like this:

public static IObservable<string> GetFileSource(string path, Func<string, Task<string>> processor, IScheduler scheduler = null) {

  scheduler = scheduler ?? Scheduler.Default;

  return Observable.Create<string>(obs => 
  {
    //Grab the enumerator as our iteration state.
    var enumerator = Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories)
                              .GetEnumerator();
    return scheduler.Schedule(enumerator, async (e, recurse) =>
    {
      if (!e.MoveNext())
      {
         obs.OnCompleted();
         return;
      }

      //Wait here until processing is done before moving on
      obs.OnNext(await processor(e.Current));

      //Recursively schedule
      recurse(e);
    });
  });

}

Then, instead of passing in a cancellation token, use TakeUntil:

var source = GetFileSource(path, x => {/*Do some async task here*/; return x; })
 .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1));

You can also see a more advanced example for an implementation of an async Generate method.

Community
  • 1
  • 1
paulpdaniels
  • 18,395
  • 2
  • 51
  • 55