Introduction and Assumptions
Using Reactive Extensions can definitely help with achieving the maximum concurrency. However, it's not trivial Rx so if you are new to it, it's going to be a bit of a head-spinner. The implementation is quite concise, but the explanation is unfortunately lengthy as a lot of Rx concepts are used.
In order to keep things as sane as possible I make the following assumptions:
- Individual items do not appear as members of the collection more than once. i.e. You will not Add an item more than once without removing it first
- Add and Remove tasks must not run concurrently for the same item.
- Add and Remove tasks can be run concurrently for different items. Furthermore this is desirable to achieve best performance.
- An item has an appropriate
Equals
implementation to enable grouping Add and Remove events correctly
- If you clear or reset the collection, you must dispose the task handler below and start a new one
Handler function signature and usage
I create a function that handles running the add and remove tasks. It returns an IDisposable
you Dispose
when you want to detach from the collection. The signature is as follows:
private IDisposable HandleAddRemove<T>(
ObservableCollection<T> collection,
Func<T, Task> addActionAsync,
Func<T, Task> removeActionAsync)
It accepts three arguments:
- The collection itself as an
ObservableCollection<T>
- A factory function that accepts an item and returns a started Add background task
- A factory function that accepts an item and returns a started Remove background task
The implementation
First, I need to be able to get an enumerable that alternately returns the add and remove task factories. We'll see why later - here it is:
private IEnumerable<Func<T, Task>> GetAddRemoveLoop<T>(
Func<T, Task> addFunc,
Func<T, Task> removeFunc)
{
while (true)
{
yield return addFunc;
yield return removeFunc;
}
}
Now for the implementation itself. I'll go through it piece by piece and then present the whole thing at the end. What we are building up is one big pipeline that will process each event and call the appropriate actions.
Here's the whole function:
private IDisposable HandleAddRemove<T>(
ObservableCollection<T> collection,
Func<T, Task> addActionAsync,
Func<T, Task> removeActionAsync)
{
return Observable
.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
h => collection.CollectionChanged += h,
h => collection.CollectionChanged -= h)
.Select(evt => evt.EventArgs)
.Where(item => item.Action == NotifyCollectionChangedAction.Add ||
item.Action == NotifyCollectionChangedAction.Remove)
.Select(x => x.Action == NotifyCollectionChangedAction.Add
? (T) x.NewItems[0]
: (T) x.OldItems[0])
.GroupBy(item => item)
.SelectMany(item => item.Zip(
GetAddRemoveLoop(addActionAsync, removeActionAsync),
(i, action) => Observable.Defer(() => action(i).ToObservable())).Concat())
.Subscribe();
}
Now let's break it down:
1. Hook up to the CollectionChanged event
FromEventPattern
creates an IObservable<EventPattern<XXXEventArgs>>
. It looks a little odd - the arguments are delegates that are called to hook up and detach from the event whenever the resulting observable is subscribed to or completes (either normally or exceptionally).
Observable.FromEventPattern
<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
h => collection.CollectionChanged += h,
h => collection.CollectionChanged -= h)
2. Get the EventArgs from the EventPattern items
EventPattern
is a bit unwieldy - we only need to work with the EventArgs
property so we can transform each item with a Select
:
.Select(evt => evt.EventArgs)
3. Filter to get just Add and Remove actions
Now we need to filter out all but the Add
and Remove
actions. We do this with a Where
operator:
.Where(item => item.Action == NotifyCollectionChangedAction.Add ||
item.Action == NotifyCollectionChangedAction.Remove)
4. Extract the item from the EventArgs
We make use of the fact that for an Add action, there is exactly one item in the NewItems
collection, and for the Remove action, there is exactly one item in the OldItems
collection. This Select
extracts the item appropriately depending on the action type:
.Select(x => x.Action == NotifyCollectionChangedAction.Add
? (T) x.NewItems[0]
: (T) x.OldItems[0])
5. Group occurrences of the same item together
For a given item, we want to ensure only one add or remove task can be in flight. To do this we will create a group for each item. In Rx, the group operator takes a stream and produces a stream of streams - a new stream is created for each group. It also takes a key selector - a function it uses to compare items to decide which group to put them in. Here we use an identity function and assume a good implementation of .Equals
:
.GroupBy(item => item)
The type of the stream returned is an IObservable<IGroupedObservable<T,T>>
! A stream of group streams. There are two type parameters in a group stream; one for the key and one for the items in the group. In this case we are using the item itself as the key, so the type parameters are both T
. A grouped observable is just like a regular observable with the addition of a Key
property.
6. Pass an item to an Add or Remove task
This is the tricky part! At this point, each group will contain the same item being sent repeatedly - once each time Add(item)
or Remove(item)
is called on the collection. The first will be for an Add
the second for a Remove
and so on.
Ignore the SelectMany
for the moment - lets look inside it first:
We use the Zip
operator which will pair each item from the group stream with an add/remove factory function returned from the enumerable produced by the helper function described earlier.
The first argument to Zip
is the enumerable - a call to our helper function.
The second is a "selector function". This accepts the zipped pair of item and factory function and combines them to get a result. So the Zip looks like this:
item.Zip(GetAddRemoveLoop(addActionAsync, removeActionAsync),
(i, action) => /* get result of combining item and action */)
And the output is going to be an IObservable<TResult>
where TResult
is the type returned by the selector function.
What we want to do now is turn the invocation of the Add/Remove function (that returns a started Task
that completes when the add/remove action is complete) into an IObservable
stream. There is a handy extension method that converts a Task
to an IObservable
called ToObservable
. With a plain Task it uses a special type Unit
as the return type. Unit
is a common functional type that is used when you need to know something is done but you don't care what it is. So ToObservable
will give us an IObservable<Unit>
representing our asynchronous task.
A naive approach would be to do this:
(i, action) => action(i).ToObservable();
The problem with this is that the lambda will be evaluated too soon. We want to only call the add/remove task on subscription to the IObservable<Unit>
. Observable.Defer
will do that for us, only calling the action you pass it on subscription. So instead of the above we do:
(i, action) => Observable.Defer(() => action(i).ToObservable())
So we've called the Zip
function on each group and produced an IObservable<IObservable<Unit>>
a stream of streams representing alternate calls to add/remove.
7. Ensuring add/remove are called serially
We now need to ensure each add/remove stream within the group is subscribed to one after the other. To this we call Concat
. This contanenates a stream of streams subscribing to each sub-stream only when the preceding sub-stream has finshed. So it converts our IObservable<IObservable<Unit>>
to a flat IObservable<Unit>
.
8. Flattening the groups
Each group is still a stream though. Now we can hop back to that SelectMany
. We use it on the groups to take all the individual group streams and flatten them to a single stream. We are converting IObservable<IGroupedObservable<T,T>>
-> IObservable<Unit>
:
.SelectMany(item => // the Zip and Concat result
// that produced the `IObservable<Unit>` of add/remove tasks
9. Now Run it!
Finally we just call Subscribe on this. We don't care about the results - the stream itself is calling our add/remove tasks. Since the groups themselves are non-terminating, this stream won't end unless we dispose the IDisposable
we get back:
.Subscribe();
This will call Subscribe on the operator it is attached to, and each operator will subscribe to the operators above it, and so on right the way up to the inital FromEventArgs
.
Rememeber to Dispose the returned IDisposable
if you need to clear down the collection and/or unsubscribe from the CollectionChanged
event.
Complete Solution
Here's the whole function again:
private IDisposable HandleAddRemove<T>(
ObservableCollection<T> collection,
Func<T, Task> addActionAsync,
Func<T, Task> removeActionAsync)
{
return Observable
.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
h => collection.CollectionChanged += h,
h => collection.CollectionChanged -= h)
.Select(evt => evt.EventArgs)
.Where(item => item.Action == NotifyCollectionChangedAction.Add ||
item.Action == NotifyCollectionChangedAction.Remove)
.Select(x => x.Action == NotifyCollectionChangedAction.Add
? (T) x.NewItems[0]
: (T) x.OldItems[0])
.GroupBy(item => item)
.SelectMany(item => item.Zip(
GetAddRemoveLoop(addActionAsync, removeActionAsync),
(i, action) => Observable.Defer(() => action(i).ToObservable())).Concat())
.Subscribe();
}
Sample Usage
Here's a usage where we provide long running add/remove tasks to show the effect clearly:
var collection = new ObservableCollection<string>();
Func<string, Task> addAction = x =>
{
Console.WriteLine("Begin add task for " + x);
return Task.Delay(2000)
.ContinueWith(t => Console.WriteLine("End add task for " + x));
};
Func<string, Task> removeAction = x =>
{
Console.WriteLine("Begin remove task for " + x);
return Task.Delay(3000)
.ContinueWith(t => Console.WriteLine("End remove task for " + x));
};
var sub = HandleAddRemove(
collection,
addAction,
removeAction);
collection.Add("item1");
Thread.Sleep(1000);
collection.Remove("item1");
Thread.Sleep(1000);
collection.Add("item2");
collection.Add("item3");
Thread.Sleep(5000);
collection.Remove("item3");
Thread.Sleep(1000);
collection.Remove("item2");
Thread.Sleep(30000);
Console.WriteLine("Done");
sub.Dispose();
This gives the result:
Begin add task for item1
Begin add task for item2
Begin add task for item3
End add task for item1
Begin remove task for item1
End add task for item3
End add task for item2
End remove task for item1
Begin remove task for item3
Begin remove task for item2
End remove task for item3
End remove task for item2
Done
Hope this hasn't bamboozled you too much. I am aware my assumptions may be too far off, if so I hope this was useful or inspirational in some way anyway!