I hit something new to me with the following piece of code when following the equivalent in C# here. The compiler gives multiple errors basically telling the IConnectableObservable created in source.Publish()
does not match IObservable
even though it derives from it (according to the MSDN article linked).
Is there something in F# that is different with regard to C# concerning inheritance in this case or can someone provider pointers as to what is going on? Have I just made a typo I can't see? What comes to the heading regarding covariance, it's just a wild guess as I'm at least temporarily out of ideas. And so, maybe writing somewhere may help me and others...
One example of the many error messages:
No overloads match for method 'Create'. The available overloads are shown below (or in the Error List window).
No overloads match for method 'Switch'. The available overloads are shown below (or in the Error List window).
Error Possible overload: '(extension) IObservable.Switch<'TSource>() : IObservable<'TSource>'. Type constraint mismatch. The type IObservable<IConnectableObservable<'b>> is not compatible with type IObservable<IObservable<'a>> The type 'IObservable<'a>' does not match the type 'IConnectableObservable<'b>'.
open System.Reactive.Concurrency
open System.Reactive.Disposables
open System.Reactive.Subjects
open System.Reactive.Linq
type Observable with
static member inline Suspendable(source: IObservable<_>, suspend: IObservable<bool>, isSuspendedInitially: bool): IObservable<_> =
Observable.Create<_>(fun observer ->
let shared = source.Publish()
let pausable =
suspend.StartWith(isSuspendedInitially)
.TakeUntil(shared.LastOrDefaultAsync())
.DistinctUntilChanged()
.Select(fun p -> if p then shared else Observable.Empty<_>())
.Switch()
new CompositeDisposable(pausable.Subscribe(observer), shared.Connect()))
The corresponding C# code
public static class RxExtensions
{
public static IObservable<T> Suspendable<T>(this IObservable<T> stream, IObservable<bool> suspend, bool isSuspendedInitially)
{
return Observable.Create<T>(o =>
{
var shared = stream.Publish();
var pausable = suspend
.StartWith(isSuspendedInitially)
.TakeUntil(shared.LastOrDefaultAsync())
.DistinctUntilChanged()
.Select(p => p ? shared : Observable.Empty<T>())
.Switch();
return new CompositeDisposable(pausable.Subscribe(o), shared.Connect());
});
}
}