7

The Reactive Extensions allow you to easily subscribe to an event using Observable.FromEventPattern, but I can't find anything on how you might implement an event when you have an IObservable.

My situation is this: I need to implement an interface which contains an event. That event is supposed to be called whenever a certain value of my object changes, and for thread safety reasons I need to call this event on a certain SynchronizationContext. I am also supposed to call each event handler with the current value on registration.

public interface IFooWatcher
{
    event FooChangedHandler FooChanged;
}

Getting an observable that does what I want is rather easy with Rx using BehaviorSubject:

public class FooWatcher
{
    private readonly BehaviorSubject<Foo> m_subject;
    private readonly IObservable<Foo> m_observable;

    public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
    {
        m_subject = new BehaviorSubject<Foo>(initialValue);
        m_observable = m_subject
            .DistinctUntilChanged()
            .ObserveOn(synchronizationContext);
    }

    public event FooChangedHandler FooChanged
    {
        add { /* ??? */ }
        remove { /* ??? */ }
    }
}

Now I am looking for an easy way to have the add and remove functions subscribe and unsubscribe the passed FooChangedHandler as an Observer<Foo> on m_observable. My current implementation looks similar to this:

    add
    {
        lock (m_lock)
        {
            IDisposable disp = m_observable.Subscribe(value);
            m_registeredObservers.Add(
                new KeyValuePair<FooChangedHandler, IDisposable>(
                    value, disp));
        }
    }

    remove
    {
        lock (m_lock)
        {
            KeyValuePair<FooChangedHandler, IDisposable> observerDisposable =
                m_registeredObservers
                    .First(pair => object.Equals(pair.Key, value));
            m_registeredObservers.Remove(observerDisposable);
            observerDisposable.Value.Dispose();
        }
    }

However, I hope to find an easier solution, because I need to implement several of these events (of differing handler types). I tried to roll my own generic solution but it creates some additional problems that need to be worked around (in particular, how you generically work with a delegate that takes a parameter of T), so I would prefer to find an existing solution that bridges the gap in this direction - just as FromEventPattern does the reverse.

Medo42
  • 3,821
  • 1
  • 21
  • 37

2 Answers2

2

You could do this:

public event FooChangedHandler FooChanged
{
    add { m_observable.ToEvent().OnNext += value; }
    remove { m_observable.ToEvent().OnNext -= value; }
}

However, on the remove, I think perhaps you just may want to dispose of the subscription ... or perhaps get the Action from ToEvent() and store that as a member. Untested.

EDIT: You'll have to use Action instead of a FooChangedHandler delegate, however.

EDIT 2: Here's a tested version. I suppose you need to use FooChangedHandler, however, since you have a bunch of these pre-existing handlers?

void Main()
{
    IObservable<Foo> foos = new [] { new Foo { X = 1 }, new Foo { X = 2 } }.ToObservable();
    var watcher = new FooWatcher(SynchronizationContext.Current, new Foo { X = 12 });
    watcher.FooChanged += o => o.X.Dump();  
    foos.Subscribe(watcher.Subject.OnNext); 
}

// Define other methods and classes here

//public delegate void FooChangedHandler(Foo foo);
public interface IFooWatcher
{
    event Action<Foo> FooChanged;
}

public class Foo {
    public int X { get; set; }
}
public class FooWatcher
{

    private readonly BehaviorSubject<Foo> m_subject;
    public BehaviorSubject<Foo> Subject { get { return m_subject; } }
    private readonly IObservable<Foo> m_observable;

    public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
    {
        m_subject = new BehaviorSubject<Foo>(initialValue);

        m_observable = m_subject
            .DistinctUntilChanged();
    }

    public event Action<Foo> FooChanged
    {
        add { m_observable.ToEvent().OnNext += value; }
        remove { m_observable.ToEvent().OnNext -= value; }
    }
}
Richard Anthony Hein
  • 10,550
  • 3
  • 42
  • 62
  • Thanks, I had not seen `ToEvent()` before! This is a big piece of the puzzle already. The conversion between `Action` and `FooChangedHandler` is still an issue (yes, unfortunately we're stuck with these). Problem: Delegates are equal if they have the same target and method, but this equality only reaches one level - if you have a delegate and wrap it in two separate `Action` instances those will be equal, but if you wrap both of those `Action` instances in another `Action` each, *those* will *not* be equal. However, I think I saw a solution to this already... – Medo42 Mar 05 '13 at 21:11
  • 1
    Ah yes, here is was: http://stackoverflow.com/a/9290684/575615 I'll test this tomorrow, big thanks again! – Medo42 Mar 05 '13 at 21:18
  • Everything works as expected now. Using a solution like the one linked above for converting the delegates to `Action` allowed me to write a short and sweet generic solution that works with `ToEvent()` internally. There is not much to that class, but I wanted to bundle the `Subject` and the `IEventSource` together in one object. – Medo42 Mar 06 '13 at 14:32
  • 1
    Sorry to retract the accept again, but I just noticed a serious issue with your example code. Calling `ToEvent` will always create a *new* `IEventSource`, so your code will try to deregister from a different event than it registered on. I tried this out, and it definitely does not work. Call `ToEvent` once and store the result instead. I'll re-accept once you fix this. – Medo42 Mar 06 '13 at 14:40
  • Ok, cool, I'll try to get to it later today/tonight ... I have an exam today. Cheers. – Richard Anthony Hein Mar 06 '13 at 19:27
0

Given that you are already mixing the boundaries between reactive and more normal code, you could do a less reactive version. To start simply declare a normal event pattern

public event FooChangedHandler FooChanged;

protected void OnFooChanged(Foo)
{
  var temp = FooChanged;
  if (temp != null)
  {  
    temp(new FooChangedEventArgs(Foo));
  }
}

and then simply connect the observable to it in the constructor

m_Observable.Subscribe(foo => OnFooChanged(foo));

It's not very Rx but it is incredibly simple.

AlSki
  • 6,868
  • 1
  • 26
  • 39
  • You can leave out `OnFooChanged` completely by assigning a `delegate{}` to `FooChanged` at the very start, collapsing this solution to two lines. However, this approach does not meet my requirements, because new subscribers to the event do not receive the current state. Besides, since my `SyncrhonizationContext` essentially posts to an event loop, new subscribers can receive changes that (using Java memory model terms) happened-before the subscription was established. This is probably not a problem, but I would rather avoid it. – Medo42 Mar 06 '13 at 09:08