I'm using Rx to process events in groups of maximal X items or after no new events were sent for Y ms. For this purpose I use the code suggested in this answer here.
This worked fine until i recognized, that when I wait until all events are processed before continuing, it happens that some events stay in the queue unprocessed.
I found out, that this happens when a new event is sent while the queue is still busy processing the previous events. If an event is sent after the processing finished, all events in the queue are processed, even the events which were stuck previously.
Test code and output
To reproduce the behavior I wrote some sample code which adds numbers to the subject
and then waits until the number is processed or the timeout is hit (10sec
).
If the number is processed another number is added.
This second number is sent while the processing of the previous number is still busy, this leads to the described behavior.
This behavior can also be reproduced without any delay, it just won't happen regularly, because the timing has to be just right for it to happen.
private DispatcherScheduler _schedulerProviderDispatcher = new DispatcherScheduler(Application.Current.Dispatcher);
private Subject<int> _numberEvents = new Subject<int>();
private readonly ConcurrentDictionary<int, object> _numbersInProgress = new ConcurrentDictionary<int, object>();
private IDisposable _disposable;
public async Task SendNumbersAsync()
{
_disposable = _numberEvents.Synchronize()
.BufferUntilInactive(TimeSpan.FromMilliseconds(2000), _schedulerProviderDispatcher, 100)
.Subscribe(numbers =>
{
if (numbers.Count == 0)
return;
var values = string.Empty;
// Handle numbers in queue and remove them from the dictionary
for (var i = 0; i < numbers.Count; ++i)
{
var number = numbers[i];
if (_numbersInProgress.TryRemove(number, out _) == false)
Trace.WriteLine($"Failed to remove number '{number}'");
values += $"{number}, ";
Trace.WriteLine($"Handled Number: {number}, Count: {i + 1}/{numbers.Count}");
}
Trace.WriteLine($"Handled '{numbers.Count}' numbers. Values: '{values}'");
// delay the execution by 1000ms to simulate a slow processing of the numbers
// and create a timeframe where new numbers will be sent and received, but the subject is still busy with the previous processing
var task = Task.Delay(1000);
Task.WaitAll(task);
Trace.WriteLine($"Finished handling '{numbers.Count}' numbers. Values: '{values}'");
});
// push numbers to the subject
var number = 0;
while (_numbersInProgress.Count == 0)
{
++number;
Trace.WriteLine($"Sending number: {number}");
_numberEvents.OnNext(number);
Trace.WriteLine($"Finished sending number: {number}");
// add the number to the progress dictionary
if (_numbersInProgress.TryAdd(number, 0) == false)
Trace.WriteLine($"Failed to add number '{number}'");
var waitCount = 0;
var timedOut = false;
// wait for the numbers to be processed
// if we waited 100 times (10sec) stop
while (_numbersInProgress.Count != 0)
{
Trace.WriteLine($"Waiting ({waitCount})");
await Task.Delay(100);
++waitCount;
if (waitCount > 100)
{
timedOut = true;
break;
}
}
if (timedOut)
Trace.WriteLine($"Timeout waiting: {number}");
else
Trace.WriteLine($"Finished waiting: {number}");
}
// if we still have unprocessed numbers
// send another number to the subject to trigger processing of both numbers
if (_numbersInProgress.Count != 0)
{
// Failed
Trace.WriteLine($"Failed waiting: {number}");
Trace.WriteLine($"Sending number: {9999}");
_numberEvents.OnNext(9999);
Trace.WriteLine($"Finished sending number: {9999}");
}
}
And the extension method, used to group the numbers:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, IScheduler scheduler = null, Int32? maxCount = null)
{
var s = scheduler ?? Scheduler.Default;
var publish = stream.Publish(p =>
{
var closes = p.Throttle(delay, s);
if (maxCount != null)
{
var overflows = p.Where((x, index) => index + 1 >= maxCount);
closes = closes.Amb(overflows);
}
return p.Window(() => closes).SelectMany(window => window.ToList());
});
return publish;
}
When executing this method, the trace outputs the following lines:
number 2 is sent before the processing of the previous sent number 1 is finished
the loop runs in the timeout condition after 100 wait cycles and then sends a new number, which triggeres the processing of both numbers in the queue
As you can see I've already tried to solve this problem by encapsulating the throttle
and the grouping of the items (BufferUntilInactive
) in a publish
, but without any success.
Am I missing anything in the BufferUntilInactive
method or somewhere else?