6

With the release of Mediatr 10, there's now a paradigm that allows developers to create streams powered by IAsyncEnumerable. I'm leveraging this paradigm to create multiple different file system watchers to monitor multiple folders. To monitor the folders, I'm leveraging two different approaches: Polling and FileSystemWatcher. As part of my pipeline, all of the different folder monitors are aggregated into a single IEnumerable<IAsyncEnumerable<FileRecord>. In each type of watcher, there's an internal loop that runs until cancellation is requested via a CancellationToken.

Here's the polling watcher:

public class PolledFileStreamHandler : 
    IStreamRequestHandler<PolledFileStream, FileRecord>
{
    private readonly ISeenFileStore _seenFileStore;
    private readonly IPublisher _publisher;
    private readonly ILogger<PolledFileStreamHandler> _logger;

    public PolledFileStreamHandler(
        ISeenFileStore seenFileStore, 
        IPublisher publisher, 
        ILogger<PolledFileStreamHandler> logger)
    {
        _seenFileStore = seenFileStore;
        _publisher = publisher;
        _logger = logger;
    }

    public async IAsyncEnumerable<FileRecord> Handle(
        PolledFileStream request, 
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        var queue = new ConcurrentQueue<FileRecord>();
        while (!cancellationToken.IsCancellationRequested)
        {
            var files = Directory.EnumerateFiles(request.Folder)
                .Where(f => !_seenFileStore.Contains(f));

            await Parallel.ForEachAsync(files, CancellationToken.None, async (f,t) =>
            {
                var info = new FileRecord(f);
                
                _seenFileStore.Add(f);
                await _publisher.Publish(new FileSeenNotification { FileInfo = info }, t);
                queue.Enqueue(info);
            });
            
            // TODO: Try mixing the above parallel task with the serving task... Might be chaos...

            while (!queue.IsEmpty)
            {
                if (queue.TryDequeue(out var result))
                    yield return result;
            }

            _logger.LogInformation("PolledFileStreamHandler watching {Directory} at: {Time}", request.Folder, DateTimeOffset.Now);
            
            await Task.Delay(request.Interval, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
    }
}

And the FileSystemWatcher

public class FileSystemStreamHandler : 
    IStreamRequestHandler<FileSystemStream, FileRecord>
{
    private readonly ISeenFileStore _seenFileStore;
    private readonly ILogger<FileSystemStreamHandler> _logger;
    private readonly IPublisher _publisher;
    private readonly ConcurrentQueue<FileRecord> _queue;

    private Action<object, FileSystemEventArgs>? _tearDown;

    public FileSystemStreamHandler(
        ISeenFileStore seenFileStore, 
        ILogger<FileSystemStreamHandler> logger, 
        IPublisher publisher)
    {
        _seenFileStore = seenFileStore;
        _logger = logger;
        _publisher = publisher;
        _queue = new ConcurrentQueue<FileRecord>();
    }

    public async IAsyncEnumerable<FileRecord> Handle(
        FileSystemStream request, 
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        var watcher = SetupWatcher(request.Folder, cancellationToken);
        
        while (!cancellationToken.IsCancellationRequested)
        {
            if (_queue.TryDequeue(out var record))
                yield return record;

            await Task.Delay(100, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
        
        TearDownWatcher(watcher);
    }
    
    private FileSystemWatcher SetupWatcher(string folder, CancellationToken cancellation)
    {
        var watcher = new FileSystemWatcher(folder);
        watcher.NotifyFilter = NotifyFilters.Attributes
                               | NotifyFilters.CreationTime
                               | NotifyFilters.DirectoryName
                               | NotifyFilters.FileName
                               | NotifyFilters.LastAccess
                               | NotifyFilters.LastWrite
                               | NotifyFilters.Security
                               | NotifyFilters.Size;
        watcher.EnableRaisingEvents = true;
        _tearDown = (_, args) => OnWatcherOnChanged(args, cancellation);
        watcher.Created += _tearDown.Invoke;

        return watcher;
    }
    
    private async void OnWatcherOnChanged(FileSystemEventArgs args, CancellationToken cancellationToken)
    {
        var path = args.FullPath;

        if (_seenFileStore.Contains(path)) return;
            
        _seenFileStore.Add(path);

        try
        {
            if ((File.GetAttributes(path) & FileAttributes.Directory) != 0) return;
        }
        catch (FileNotFoundException)
        {
            _logger.LogWarning("File {File} was not found. During a routine check. Will not be broadcast", path);
            return;
        }
            
        var record = new FileRecord(path);
        _queue.Enqueue(record);
        await _publisher.Publish(new FileSeenNotification { FileInfo = record }, cancellationToken);
    }

    private void TearDownWatcher(FileSystemWatcher watcher)
    {
        if (_tearDown != null)
            watcher.Created -= _tearDown.Invoke;
    }
}

Finally, here's the class that ties everything together and attempts to monitor the streams (in the StartAsync method). You'll notice the presence of a Merge operator coming from System.Interactive.Async, this does not currently operate as desired.

public class StreamedFolderWatcher : IDisposable
{
    private readonly ConcurrentBag<Func<IAsyncEnumerable<FileRecord>>> _streams;
    private CancellationTokenSource? _cancellationTokenSource;
    private readonly IMediator _mediator;
    private readonly ILogger<StreamedFolderWatcher> _logger;

    public StreamedFolderWatcher(
        IMediator mediator,
        IEnumerable<IFileStream> fileStreams, 
        ILogger<StreamedFolderWatcher> logger)
    {
        _mediator = mediator;
        _logger = logger;
        _streams = new ConcurrentBag<Func<IAsyncEnumerable<FileRecord>>>();
        _cancellationTokenSource = new CancellationTokenSource();

        fileStreams.ToList()
            .ForEach(f => AddStream(f, _cancellationTokenSource.Token));
    }

    private void AddStream<T>(
        T request, 
        CancellationToken cancellationToken) 
        where T : IStreamRequest<FileRecord>
    {
        _streams.Add(() => _mediator.CreateStream(request, cancellationToken));
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        _cancellationTokenSource = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);

        var streams = _streams.Select(s => s()).ToList();
        while (!cancellationToken.IsCancellationRequested)
        {
            await foreach (var file in streams.Merge().WithCancellation(cancellationToken))
            {
                _logger.LogInformation("Incoming file {File}", file);
            }
            
            await Task.Delay(1000, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
    }

    public Task StopAsync()
    {
        _cancellationTokenSource?.Cancel();

        return Task.CompletedTask;
    }

    public void Dispose()
    {
        _cancellationTokenSource?.Dispose();
        GC.SuppressFinalize(this);
    }
}

My expectation for the Merge behavior is that if I have 3 IAsyncEnumerables, each item should be emitted as soon as it's yielded. Instead, unless I place yield break somewhere within the loops, the first IStreamRequestHandler fetched will simply execute ad infinitum until the cancellation token forces a stop.

How can I merge multiple input IAsyncEnumerables into a single long-lived output stream, that emits each time a result is yielded?

Minimum Reproducible Sample

static async IAsyncEnumerable<(Guid Id, int Value)> CreateSequence(
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    var random = new Random();
    var id = Guid.NewGuid();
    while (!cancellationToken.IsCancellationRequested)
    {
        await Task.Delay(TimeSpan.FromMilliseconds(random.Next(100, 1000)));
        yield return (id, random.Next(0, 10));
    }
}

var token = new CancellationTokenSource();
var sequences = Enumerable.Range(0, 10)
    .Select(_ => CreateSequence(token.Token));
var merged = sequences.Merge();

await foreach (var (id, value) in merged)
{
    Console.WriteLine($"[{DateTime.Now.ToShortTimeString()}] Value {value} Emitted from {id}");
}
JD Davis
  • 3,517
  • 4
  • 28
  • 61
  • Does this answer your question? [Asynchronously write to an IAsyncEnumerable](https://stackoverflow.com/questions/70023736/asynchronously-write-to-an-iasyncenumerable) – Raymond Chen Jan 10 '22 at 20:19
  • @RaymondChen I'm not sure. That appears to be tailored towards converting `IEnumerable` to a unified `IAsyncEnumerable`, but I will see if any of it can be repurposed. – JD Davis Jan 10 '22 at 20:28
  • Oof.. I've seen a duplicate of this recently.. – Caius Jard Jan 10 '22 at 21:10
  • This looks like a classical usecase for the producer consumer pattern. Maybe you can enumerate all asyncenumerables into a channel and then return ReadAllAsync. Cf. https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/ – Clemens Jan 10 '22 at 21:14
  • I have come up with a rather brute force solution that utilizes several background tasks. I will post it below. It's definitely far from the best solution, so I'll leave the question open in anticipation of being corrected. – JD Davis Jan 10 '22 at 21:18
  • Could you include a minimal example that reproduces the undesirable behavior of the `Merge` operator? – Theodor Zoulias Jan 10 '22 at 21:47
  • @TheodorZoulias Please see my updated question for my repro. – JD Davis Jan 10 '22 at 22:20
  • 1
    It seems that the issue you are facing is fixed now (as of `System.Interactive.Async` 6.0.1). I just wanted to take a moment and say that I really appreciated this question because it helped me better understand how to work with `IAsyncEnumerable` as well as this library. [Working example](https://github.com/JaimeStill/distributed-architecture/blob/f4637b5488ab4488825c3fd6adecbf2b0ab0d0b3/proto/Picsum/PicsumPhoto.cs) – Jaime Still Jun 18 '22 at 00:48

3 Answers3

6

It seems that the Rx team messed up with the Merge operator, and have created overloads with divergent behavior. This overload supports concurrency:

public static IAsyncEnumerable<TSource> Merge<TSource>(
    params IAsyncEnumerable<TSource>[] sources);

This overload does not support concurrency:

public static IAsyncEnumerable<TSource> Merge<TSource>(
    this IEnumerable<IAsyncEnumerable<TSource>> sources);

From the comments inside the source code:

// REVIEW:
// This implementation does not exploit concurrency. We should not introduce such
// behavior in order to avoid breaking changes, but we could introduce a parallel
// ConcurrentMerge implementation. It is unfortunate though that the Merge
// overload accepting an array has always been concurrent, so we can't change that
// either (in order to have consistency where Merge is non-concurrent, and
// ConcurrentMerge is).

So what you have to do is to convert your enumerable .ToArray() before the Merge().

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Well isn't that something! That's a fantastic catch. I can confirm that this does appear to correct the issue entirely. Also worth noting, I cannot use the extension method to accomplish this, instead I have to rely on `AsyncEnumerableEx.Merge` instead of `myAsyncEnumerableCollection.Merge()`. – JD Davis Jan 10 '22 at 23:19
0

I managed to come up with a working, but likely inefficient and potentially buggy solution. By putting each IAsyncEnumerable into its own background task, I'm able to emit each into a thread-safe queue, where they're served up as each one comes available.

public static async IAsyncEnumerable<TSource> MergeAsyncEnumerable<TSource>(
    this IEnumerable<IAsyncEnumerable<TSource>> sources,
    TimeSpan? debounceTime = default,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var queue = new ConcurrentQueue<TSource>();
    var tasks = SetupCollections(sources, queue, cancellationToken);
    
    while (!Task.WhenAll(tasks).IsCompleted)
    {
        while (!queue.IsEmpty)
            if (queue.TryDequeue(out var record))
                yield return record;
            
        // Small debounce to prevent an infinite loop from just spinning. 
        await WaitIfDebounce(debounceTime, cancellationToken);
    }

    await Task.CompletedTask;
}

private static Task WaitIfDebounce(
    TimeSpan? debounceTime,
    CancellationToken cancellationToken)
{
    return debounceTime.HasValue
        ? Task.Delay(debounceTime.Value, cancellationToken)
            .ContinueWith(_ => { }, CancellationToken.None)
        : Task.CompletedTask;
}

private static IList<Task> SetupCollections<TSource>(
    IEnumerable<IAsyncEnumerable<TSource>> sources,
    ConcurrentQueue<TSource> queue,
    CancellationToken cancellationToken)
{
    return sources
        .Select(s => Task.Run(async () =>
        {
            await foreach (var file in s.WithCancellation(cancellationToken)) 
                queue.Enqueue(file);
        }, cancellationToken))
        .ToList();
}
JD Davis
  • 3,517
  • 4
  • 28
  • 61
  • @TheodorZoulias Overall, it's unnecessary, but I added it for my particular use-case as a sort of debounce. – JD Davis Jan 10 '22 at 22:02
  • @TheodorZoulias it also helped get the CPU usage down since I didn't just have an infinite loop sitting there spinning. On my i9-9900k, the app's CPU usage goes from a constant 6-8% down to 0-1%. – JD Davis Jan 10 '22 at 22:11
0

My two cents for merging IAsyncEnumerable streams:

public async IAsyncEnumerable<JobResult> Merge(List<IAsyncEnumerator<JobResult>> enumerators, [EnumeratorCancellation] CancellationToken cancellationToken)
{
    List<(Task<bool> task, IAsyncEnumerator<JobResult> enumerator)> enumeratorsInProgress = new();
    foreach (IAsyncEnumerator<JobResult> enumerator in enumerators)
    {
        enumeratorsInProgress.Add((enumerator.MoveNextAsync().AsTask(), enumerator));
    }

    while (enumeratorsInProgress.Any())
    {
        await Task.WhenAny(enumeratorsInProgress.Select(item => item.task));

        var length = enumeratorsInProgress.Count - 1;
        for (int i = length; i >= 0; i--)
        {
            // Check for additional TaskStatus as needed
            if (enumeratorsInProgress[i].task.Status == TaskStatus.RanToCompletion)
            {
                var enumeratorWithCompletedTask = enumeratorsInProgress[i];
                enumeratorsInProgress.Remove(enumeratorWithCompletedTask);
                if(enumeratorWithCompletedTask.task.Result)
                {
                    yield return enumeratorWithCompletedTask.enumerator.Current;
                    var enumeratorInProgress = (enumeratorWithCompletedTask.enumerator.MoveNextAsync().AsTask(), enumeratorWithCompletedTask.enumerator);
                    enumeratorsInProgress.Insert(enumeratorsInProgress.Count, enumeratorInProgress);
                }
                
            }
        }
    }
}
aleksander_si
  • 1,021
  • 10
  • 28