2

I'm using Rx to process events in groups of maximal X items or after no new events were sent for Y ms. For this purpose I use the code suggested in this answer here.

This worked fine until i recognized, that when I wait until all events are processed before continuing, it happens that some events stay in the queue unprocessed.

I found out, that this happens when a new event is sent while the queue is still busy processing the previous events. If an event is sent after the processing finished, all events in the queue are processed, even the events which were stuck previously.

Test code and output

To reproduce the behavior I wrote some sample code which adds numbers to the subject and then waits until the number is processed or the timeout is hit (10sec). If the number is processed another number is added. This second number is sent while the processing of the previous number is still busy, this leads to the described behavior. This behavior can also be reproduced without any delay, it just won't happen regularly, because the timing has to be just right for it to happen.

private DispatcherScheduler _schedulerProviderDispatcher = new DispatcherScheduler(Application.Current.Dispatcher);
private Subject<int> _numberEvents = new Subject<int>();
private readonly ConcurrentDictionary<int, object> _numbersInProgress = new ConcurrentDictionary<int, object>();
private IDisposable _disposable;

public async Task SendNumbersAsync()
{
    _disposable = _numberEvents.Synchronize()
        .BufferUntilInactive(TimeSpan.FromMilliseconds(2000), _schedulerProviderDispatcher, 100)
        .Subscribe(numbers =>
        {
            if (numbers.Count == 0)
                return;

            var values = string.Empty;

            // Handle numbers in queue and remove them from the dictionary
            for (var i = 0; i < numbers.Count; ++i)
            {
                var number = numbers[i];
                if (_numbersInProgress.TryRemove(number, out _) == false)
                    Trace.WriteLine($"Failed to remove number '{number}'");

                values += $"{number}, ";
                Trace.WriteLine($"Handled Number: {number}, Count: {i + 1}/{numbers.Count}");
            }

            Trace.WriteLine($"Handled '{numbers.Count}' numbers. Values: '{values}'");

            // delay the execution by 1000ms to simulate a slow processing of the numbers
            // and create a timeframe where new numbers will be sent and received, but the subject is still busy with the previous processing 
            var task = Task.Delay(1000);
            Task.WaitAll(task);

            Trace.WriteLine($"Finished handling '{numbers.Count}' numbers. Values: '{values}'");
        });

    // push numbers to the subject
    var number = 0;
    while (_numbersInProgress.Count == 0)
    {
        ++number;
        Trace.WriteLine($"Sending number: {number}");
        _numberEvents.OnNext(number);
        Trace.WriteLine($"Finished sending number: {number}");

        // add the number to the progress dictionary
        if (_numbersInProgress.TryAdd(number, 0) == false)
            Trace.WriteLine($"Failed to add number '{number}'");

        var waitCount = 0;
        var timedOut = false;
        // wait for the numbers to be processed
        // if we waited 100 times (10sec) stop
        while (_numbersInProgress.Count != 0)
        {
            Trace.WriteLine($"Waiting ({waitCount})");
            await Task.Delay(100);
            ++waitCount;

            if (waitCount > 100)
            {
                timedOut = true;
                break;
            }
        }

        if (timedOut)
            Trace.WriteLine($"Timeout waiting: {number}");
        else
            Trace.WriteLine($"Finished waiting: {number}");
    }

    // if we still have unprocessed numbers
    // send another number to the subject to trigger processing of both numbers
    if (_numbersInProgress.Count != 0)
    {
        // Failed
        Trace.WriteLine($"Failed waiting: {number}");

        Trace.WriteLine($"Sending number: {9999}");
        _numberEvents.OnNext(9999);
        Trace.WriteLine($"Finished sending number: {9999}");
    }
}

And the extension method, used to group the numbers:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, IScheduler scheduler = null, Int32? maxCount = null)
{
     var s = scheduler ?? Scheduler.Default;

     var publish = stream.Publish(p =>
     {
          var closes = p.Throttle(delay, s);

          if (maxCount != null)
          {
              var overflows = p.Where((x, index) => index + 1 >= maxCount);
              closes = closes.Amb(overflows);
          }

          return p.Window(() => closes).SelectMany(window => window.ToList());
     });

     return publish;
}

When executing this method, the trace outputs the following lines:

sending the second number

number 2 is sent before the processing of the previous sent number 1 is finished

timeout of the second number

the loop runs in the timeout condition after 100 wait cycles and then sends a new number, which triggeres the processing of both numbers in the queue

As you can see I've already tried to solve this problem by encapsulating the throttle and the grouping of the items (BufferUntilInactive) in a publish, but without any success.

Am I missing anything in the BufferUntilInactive method or somewhere else?

Andi
  • 23
  • 5

1 Answers1

1

As described in this comment the problem is p.Window(() => closes), because it opens up a gap in which events can be missed.

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, IScheduler scheduler = null, Int32? maxCount = null)
{
     var s = scheduler ?? Scheduler.Default;

     var publish = stream.Publish(p =>
     {
          var closes = p.Throttle(delay, s);

          if (maxCount != null)
          {
              Int32 i = 0;

              var overflows = p.Where(x =>
              {
                  ++i;

                  if (i >= maxCount)
                  {
                      i = 0;
                      return true;
                  }

                  return false;
              });

              closes = closes.Amb(overflows);
          }

          return p.Window(closes).SelectMany(window => window.ToList());
     });

     return publish;
}
Mayr Philipp
  • 46
  • 1
  • 3