1

I have an observable collection that I have bound a CollectionChanged handler to. When items are added to the collection I need to bind an event handler to each of the items that were added. This in and of iteself would not be so hard, however I have to do something on a separate thread first then bind the event handler on the thing obtained on the thread.

To be clear it is something like this

protected override void OnCollectionChanged(System.Collections.Specialized.NotifyCollectionChangedEventArgs e)
{
    switch(e.Action) 
    {
        case Add:
            Background Operation - registers an event
            break;
        case Remove:
            Background Operation - unregister the event
            break;
    }
}

now the problem is, what happens if there is an add then immediately a remove for the same object. It is possible that the first thread may not have run when the second thread runs thereby causing an exception.

My thought is that there is probably a really good way to handle this using reactive extensions so I have been looking into it but I haven't been able to find anything specific.

Any ideas?

Digital Powers
  • 460
  • 6
  • 23

3 Answers3

3

Introduction and Assumptions

Using Reactive Extensions can definitely help with achieving the maximum concurrency. However, it's not trivial Rx so if you are new to it, it's going to be a bit of a head-spinner. The implementation is quite concise, but the explanation is unfortunately lengthy as a lot of Rx concepts are used.

In order to keep things as sane as possible I make the following assumptions:

  • Individual items do not appear as members of the collection more than once. i.e. You will not Add an item more than once without removing it first
  • Add and Remove tasks must not run concurrently for the same item.
  • Add and Remove tasks can be run concurrently for different items. Furthermore this is desirable to achieve best performance.
  • An item has an appropriate Equals implementation to enable grouping Add and Remove events correctly
  • If you clear or reset the collection, you must dispose the task handler below and start a new one

Handler function signature and usage

I create a function that handles running the add and remove tasks. It returns an IDisposable you Dispose when you want to detach from the collection. The signature is as follows:

    private IDisposable HandleAddRemove<T>(
        ObservableCollection<T> collection,
        Func<T, Task> addActionAsync,
        Func<T, Task> removeActionAsync)

It accepts three arguments:

  • The collection itself as an ObservableCollection<T>
  • A factory function that accepts an item and returns a started Add background task
  • A factory function that accepts an item and returns a started Remove background task

The implementation

First, I need to be able to get an enumerable that alternately returns the add and remove task factories. We'll see why later - here it is:

private IEnumerable<Func<T, Task>> GetAddRemoveLoop<T>(
    Func<T, Task> addFunc,
    Func<T, Task> removeFunc)
{
    while (true)
    {
        yield return addFunc;
        yield return removeFunc;
    }
}

Now for the implementation itself. I'll go through it piece by piece and then present the whole thing at the end. What we are building up is one big pipeline that will process each event and call the appropriate actions.

Here's the whole function:

private IDisposable HandleAddRemove<T>(
    ObservableCollection<T> collection,
    Func<T, Task> addActionAsync,
    Func<T, Task> removeActionAsync)
{
    return Observable
        .FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
            h => collection.CollectionChanged += h,
            h => collection.CollectionChanged -= h)
        .Select(evt => evt.EventArgs)
        .Where(item => item.Action == NotifyCollectionChangedAction.Add ||
                        item.Action == NotifyCollectionChangedAction.Remove)
        .Select(x => x.Action == NotifyCollectionChangedAction.Add
                            ? (T) x.NewItems[0]
                            : (T) x.OldItems[0])
        .GroupBy(item => item)
        .SelectMany(item => item.Zip(
            GetAddRemoveLoop(addActionAsync, removeActionAsync),
            (i, action) => Observable.Defer(() => action(i).ToObservable())).Concat())
        .Subscribe();
}

Now let's break it down:

1. Hook up to the CollectionChanged event

FromEventPattern creates an IObservable<EventPattern<XXXEventArgs>>. It looks a little odd - the arguments are delegates that are called to hook up and detach from the event whenever the resulting observable is subscribed to or completes (either normally or exceptionally).

Observable.FromEventPattern
    <NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
    h => collection.CollectionChanged += h,
    h => collection.CollectionChanged -= h)

2. Get the EventArgs from the EventPattern items

EventPattern is a bit unwieldy - we only need to work with the EventArgs property so we can transform each item with a Select:

.Select(evt => evt.EventArgs)

3. Filter to get just Add and Remove actions

Now we need to filter out all but the Add and Remove actions. We do this with a Where operator:

.Where(item => item.Action == NotifyCollectionChangedAction.Add ||
               item.Action == NotifyCollectionChangedAction.Remove)

4. Extract the item from the EventArgs

We make use of the fact that for an Add action, there is exactly one item in the NewItems collection, and for the Remove action, there is exactly one item in the OldItems collection. This Select extracts the item appropriately depending on the action type:

.Select(x => x.Action == NotifyCollectionChangedAction.Add
             ? (T) x.NewItems[0]
             : (T) x.OldItems[0])

5. Group occurrences of the same item together

For a given item, we want to ensure only one add or remove task can be in flight. To do this we will create a group for each item. In Rx, the group operator takes a stream and produces a stream of streams - a new stream is created for each group. It also takes a key selector - a function it uses to compare items to decide which group to put them in. Here we use an identity function and assume a good implementation of .Equals:

.GroupBy(item => item)

The type of the stream returned is an IObservable<IGroupedObservable<T,T>>! A stream of group streams. There are two type parameters in a group stream; one for the key and one for the items in the group. In this case we are using the item itself as the key, so the type parameters are both T. A grouped observable is just like a regular observable with the addition of a Key property.

6. Pass an item to an Add or Remove task

This is the tricky part! At this point, each group will contain the same item being sent repeatedly - once each time Add(item) or Remove(item) is called on the collection. The first will be for an Add the second for a Remove and so on.

Ignore the SelectMany for the moment - lets look inside it first:

We use the Zip operator which will pair each item from the group stream with an add/remove factory function returned from the enumerable produced by the helper function described earlier.

The first argument to Zip is the enumerable - a call to our helper function.

The second is a "selector function". This accepts the zipped pair of item and factory function and combines them to get a result. So the Zip looks like this:

item.Zip(GetAddRemoveLoop(addActionAsync, removeActionAsync),
        (i, action) => /* get result of combining item and action */)

And the output is going to be an IObservable<TResult> where TResult is the type returned by the selector function.

What we want to do now is turn the invocation of the Add/Remove function (that returns a started Task that completes when the add/remove action is complete) into an IObservable stream. There is a handy extension method that converts a Task to an IObservable called ToObservable. With a plain Task it uses a special type Unit as the return type. Unit is a common functional type that is used when you need to know something is done but you don't care what it is. So ToObservable will give us an IObservable<Unit> representing our asynchronous task.

A naive approach would be to do this:

(i, action) => action(i).ToObservable();

The problem with this is that the lambda will be evaluated too soon. We want to only call the add/remove task on subscription to the IObservable<Unit>. Observable.Defer will do that for us, only calling the action you pass it on subscription. So instead of the above we do:

(i, action) => Observable.Defer(() => action(i).ToObservable())

So we've called the Zip function on each group and produced an IObservable<IObservable<Unit>> a stream of streams representing alternate calls to add/remove.

7. Ensuring add/remove are called serially

We now need to ensure each add/remove stream within the group is subscribed to one after the other. To this we call Concat. This contanenates a stream of streams subscribing to each sub-stream only when the preceding sub-stream has finshed. So it converts our IObservable<IObservable<Unit>> to a flat IObservable<Unit>.

8. Flattening the groups

Each group is still a stream though. Now we can hop back to that SelectMany. We use it on the groups to take all the individual group streams and flatten them to a single stream. We are converting IObservable<IGroupedObservable<T,T>> -> IObservable<Unit>:

.SelectMany(item => // the Zip and Concat result
                    // that produced the `IObservable<Unit>` of add/remove tasks

9. Now Run it!

Finally we just call Subscribe on this. We don't care about the results - the stream itself is calling our add/remove tasks. Since the groups themselves are non-terminating, this stream won't end unless we dispose the IDisposable we get back:

.Subscribe();

This will call Subscribe on the operator it is attached to, and each operator will subscribe to the operators above it, and so on right the way up to the inital FromEventArgs.

Rememeber to Dispose the returned IDisposable if you need to clear down the collection and/or unsubscribe from the CollectionChanged event.

Complete Solution

Here's the whole function again:

private IDisposable HandleAddRemove<T>(
    ObservableCollection<T> collection,
    Func<T, Task> addActionAsync,
    Func<T, Task> removeActionAsync)
{
    return Observable
        .FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
            h => collection.CollectionChanged += h,
            h => collection.CollectionChanged -= h)
        .Select(evt => evt.EventArgs)
        .Where(item => item.Action == NotifyCollectionChangedAction.Add ||
                        item.Action == NotifyCollectionChangedAction.Remove)
        .Select(x => x.Action == NotifyCollectionChangedAction.Add
                            ? (T) x.NewItems[0]
                            : (T) x.OldItems[0])
        .GroupBy(item => item)
        .SelectMany(item => item.Zip(
            GetAddRemoveLoop(addActionAsync, removeActionAsync),
            (i, action) => Observable.Defer(() => action(i).ToObservable())).Concat())
        .Subscribe();
}

Sample Usage

Here's a usage where we provide long running add/remove tasks to show the effect clearly:

var collection = new ObservableCollection<string>();

Func<string, Task> addAction = x =>
    {
        Console.WriteLine("Begin add task for " + x);
        return Task.Delay(2000)
                .ContinueWith(t => Console.WriteLine("End add task for " + x));
    };

Func<string, Task> removeAction = x =>
{
    Console.WriteLine("Begin remove task for " + x);
    return Task.Delay(3000)
            .ContinueWith(t => Console.WriteLine("End remove task for " + x));
};

var sub = HandleAddRemove(
    collection,
    addAction,
    removeAction);

collection.Add("item1");
Thread.Sleep(1000);
collection.Remove("item1");
Thread.Sleep(1000);
collection.Add("item2");
collection.Add("item3");
Thread.Sleep(5000);
collection.Remove("item3");
Thread.Sleep(1000);
collection.Remove("item2");

Thread.Sleep(30000);

Console.WriteLine("Done");

sub.Dispose();

This gives the result:

Begin add task for item1
Begin add task for item2
Begin add task for item3
End add task for item1
Begin remove task for item1
End add task for item3
End add task for item2
End remove task for item1
Begin remove task for item3
Begin remove task for item2
End remove task for item3
End remove task for item2
Done

Hope this hasn't bamboozled you too much. I am aware my assumptions may be too far off, if so I hope this was useful or inspirational in some way anyway!

James World
  • 29,019
  • 9
  • 86
  • 120
  • 1
    This is fantastic, it does makes sense, my only question is how can you assert that the add/remove array will always be 1 long? is that something about the collectionchanged event I dont know or are you assuming that a list of items is never added at once? – Digital Powers Nov 21 '13 at 17:13
  • It's just how `ObservableCollection` is implemented. There are custom variants on it that do multiple changes; shouldn't be hard to modify for that... But this was getting quite long enough already! – James World Nov 21 '13 at 17:32
  • 1
    I'm gonna throw some salt in your game and remind you about Reset: http://stackoverflow.com/questions/4495904/what-is-notifycollectionchangedaction-reset-value – Ana Betts Nov 22 '13 at 07:47
  • I covered that one in the assumptions... Need to dispose and resub? – James World Nov 22 '13 at 09:09
  • I should add that due to the assumptions and probably lots of other cases, this isn't a perfect solution... there's no way I could cover that in an SO question. The intention was to be pedagogical - for more production ready solutions, I'd definitely recommend taking a look at Paul Betts's ReactiveUI. I use it often in production code. – James World Nov 22 '13 at 09:39
  • Ah, so you did - yeah, dealing with all the edge cases in NotifyCollectionChanged is *tough*, so many things can go Weird, especially if you add async into the mix (since the notifications are effectively deltas, not maintaining ordering means they are meaningless) – Ana Betts Nov 22 '13 at 20:05
1

In ReactiveUI, I handle a somewhat similar case using Buffer on a timer - this means, you can "debounce" quick changes to a collection:

https://github.com/reactiveui/ReactiveUI/blob/5.2.0/ReactiveUI.Platforms/Cocoa/ReactiveTableViewSource.cs#L68

Another interesting piece of code might be the implementation of ActOnEveryObject, which runs a block for every item in a list (even as that list is changed):

https://github.com/reactiveui/ReactiveUI/blob/29dcee6427407cfe189df246994ecad73f09bad2/ReactiveUI/AutoPersistHelper.cs#L96

Ana Betts
  • 73,868
  • 16
  • 141
  • 209
0

There could be multiple answers to your problem, and that depends on your exact situation.

To be safe, I assume that your collection can change form any thread. The best approach is to check the member existence before registering or unregistering the event handler, and also guard the block using lock statement:

private readonly object _eventRegisterationLock = new object();

protected override void OnCollectionChanged(System.Collections.Specialized.NotifyCollectionChangedEventArgs e)
{
    switch(e.Action) 
    {
        case Add:
            // Background Operation
            lock(_eventRegisterationLock)
            {
                // check if the collection still contains the item
                // then register the event
            }
            break;
        case Remove:
            // Background Operation
            lock(_eventRegisterationLock)
            {
                // check if the collection still contains the item
                // then unregister the event
            }
            break;
    }
}

You can also use a lock around the whole switch statement. Then you could eliminate the check for member existance, but that reduces performance, because it causes all the collection changes to be synchronous, i.e., no two changes will happen at the same time (if OnCollectinChanged be called before another call finishes, it has to wait for it to finish).

Mohammad Dehghan
  • 17,853
  • 3
  • 55
  • 72