5

This is the same as How do I await a response from an RX Subject without introducing a race condition?, but in F#.

The C# solution looks like:

static async void Foo()
{
    var subject = new Subject<int>();
    var firstInt = subject.FirstAsync().PublishLast();
    firstInt.Connect();
    subject.OnNext(42);

    var x = await firstInt;
    Console.WriteLine("Done waiting: " + x);
}

My attempt in F# is this:

let foo () =
    async {
        use subject = new Subject<int>()
        let firstInt = subject.FirstAsync().PublishLast()
        firstInt.Connect() |> ignore
        subject.OnNext(42)

        let! x = firstInt
        printfn "Done waiting: %d" x
        return ()
    }

The let x! = firstInt gives the compile error This expression was expected to have type Async<'a> but here has type IConnectableObservable<int> so apparently C# does something under the hood that F# doesn't.

Is there a C# implicit interface cast at work here, that I need to do explicitly in F#? If so, I can't figure out what it is.

Community
  • 1
  • 1
John Reynolds
  • 4,927
  • 4
  • 34
  • 42
  • In F# there is no implicit interface casting you need to do `x :> Iwhatever` – John Palmer Jan 14 '16 at 00:41
  • Out of interest, must you use subjects? My understanding is that this is not recommended: http://stackoverflow.com/questions/14396449/why-are-subjects-not-recommended-in-net-reactive-extensions?rq=1 The points made about avoiding mutable state become more pertinent in F#. – TheInnerLight Jan 14 '16 at 00:48
  • @JohnPalmer: Problem is, I don't know what interface IWhatever should be in this case, or even if an interface cast is actually what's needed in this case. – John Reynolds Jan 14 '16 at 01:07
  • @TheInnerLight: Not sure how else I should solve my problem, which is very similar to http://stackoverflow.com/questions/24394418/how-to-use-reactive-extensions-to-parse-a-stream-of-characters-from-a-serial-por , though in my case the input is a network stream instead of a serial port. – John Reynolds Jan 14 '16 at 01:13
  • I don't see a description of the problem. Does your code not work somehow? How? Or does it not compile? What's the error then? – Fyodor Soikin Jan 14 '16 at 01:39
  • Sorry, forgot the most important info. I've updated the question. – John Reynolds Jan 14 '16 at 01:46

1 Answers1

5

After further digging, it seems that C# calls GetAwaiter() under the hood when you await something. For a Subject or an IObservable, GetAwaiter returns an AsyncSubject, which isn't immediately useful in F#, but the ToTask extension method in System.Reactive.Threading.Tasks makes it useful. Apparently, you can apply ToTask directly to a Subject (or an IObservable) without going by way of GetAwaiter, so my problem is solved by changing the let! x ... statement to:

    let! x = firstInt.ToTask() |> Async.AwaitTask

edit:

There's a better way

Using FSharpx.Async is a much better way of accomplishing the same thing:

open FSharpx.Control.Observable

let foo () =
    async {
        use subject = new Subject<int>()
        subject.OnNext(42)

        let! x = Async.AwaitObservable subject
        printfn "Done waiting: %d" x
        return ()
    }
Community
  • 1
  • 1
John Reynolds
  • 4,927
  • 4
  • 34
  • 42