5

I'm trying to understand async workflows in F# but I found one part that I really don't understand.

The following code works fine:

let asynWorkflow = async{
    let! result = Stream.TryOpenAsync(partition) |> Async.AwaitTask 
    return result
    } 

let stream = Async.RunSynchronously asynWorkflow
             |> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)

I define a async workflow where TryOpenAsync returns a Task<StreamOpenResult> type. I convert it to Async<StreamOpenResult> with Async.AwaitTask. (Side quest: "Await"Task? It doesn't await it just convert it, does it? I think it has nothing to do with Task.Wait or the await keyword). I "await" it with let! and return it. To start the workflow I use RunSynchronously which should start the workflow and return the result (bind it). On the result I check if the Stream is Found or not.

But now to my first question. Why do I have to wrap the TryOpenAsync call in another async computation and let! ("await") it? E.g. the following code does not work:

let asynWorkflow =  Stream.TryOpenAsync(partition) |> Async.AwaitTask  

let stream = Async.RunSynchronously asynWorkflow
             |> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)

I thought the AwaitTask makes it an Async<T> and RunSynchronously should start it. Then use the result. What do I miss?

My second question is why is there any "Async.Let!" function available? Maybe because it does not work or better why doesn't it work with the following code?

let ``let!`` task = async{
    let! result = task |> Async.AwaitTask 
   return result
   } 

let stream = Async.RunSynchronously ( ``let!`` (Stream.TryOpenAsync(partition))  )
         |> fun openResult -> if openResult.Found then openResult.Stream else Stream(partition)

I just insert the TryOpenAsync as a parameter but it does not work. By saying does not work I mean the whole FSI will hang. So it has something to do with my async/"await".

--- Update:

Result of working code in FSI:

>

Real: 00:00:00.051, CPU: 00:00:00.031, GC gen0: 0, gen1: 0, gen2: 0
val asynWorkflow : Async<StreamOpenResult>
val stream : Stream

Result of not working code in FSI:

>

And you cannot execute anything in the FSI anymore

--- Update 2

I'm using Streamstone. Here the C# example: https://github.com/yevhen/Streamstone/blob/master/Source/Example/Scenarios/S04_Write_to_stream.cs

and here the Stream.TryOpenAsync: https://github.com/yevhen/Streamstone/blob/master/Source/Streamstone/Stream.Api.cs#L192

halfer
  • 19,824
  • 17
  • 99
  • 186
KCT
  • 287
  • 1
  • 10
  • In what way does your code 'not work'? What's the error? You shouldn't need to create an `async` workflow. – Lee Nov 16 '17 at 13:11
  • As mentioned in my last sentence I start it with FSI and it runs and runs ("hangs"). No return on the FSI. I can just restart the FSI – KCT Nov 16 '17 at 13:19

3 Answers3

4

I can't tell you why the second example doesn't work without knowing what Stream and partition are and how they work.

However, I want to take this opportunity to point out that the two examples are not strictly equivalent.

F# async is kind of like a "recipe" for what to do. When you write async { ... }, the resulting computation is just sitting there, not actually doing anything. It's more like declaring a function than like issuing a command. Only when you "start" it by calling something like Async.RunSynchronously or Async.Start does it actually run. A corollary is that you can start the same async workflow multiple times, and it's going to be a new workflow every time. Very similar to how IEnumerable works.

C# Task, on the other hand, is more like a "reference" to an async computation that is already running. The computation starts as soon as you call Stream.TryOpenAsync(partition), and it's impossible to obtain a Task instance before the task actually starts. You can await the resulting Task multiple times, but each await will not result in a fresh attempt to open a stream. Only the first await will actually wait for the task's completion, and every subsequent one will just return you the same remembered result.

In the async/reactive lingo, F# async is what you call "cold", while C# Task is referred to as "hot".

Fyodor Soikin
  • 78,590
  • 9
  • 125
  • 172
  • I'm using Streamstone but want to do it in F#. Here the C# example https://github.com/yevhen/Streamstone/blob/master/Source/Example/Scenarios/S04_Write_to_stream.cs In that repo you will also find the Steam.TryOpenAsync https://github.com/yevhen/Streamstone/blob/master/Source/Streamstone/Stream.Api.cs#L192 Thanks for pointing out that difference. Again I learned something new – KCT Nov 17 '17 at 06:39
3

The second code block looks like it should work to me. It does run it if I provide dummy implementations for Stream and StreamOpenResult.

You should avoid using Async.RunSynchronously wherever possible because it defeats the purpose of async. Put all of this code within a larger async block and then you will have access to the StreamOpenResult:

async {
    let! openResult = Stream.TryOpenAsync(partition) |> Async.AwaitTask  
    let stream = if openResult.Found then openResult.Stream else Stream(partition)
    return () // now do something with the stream
    }

You may need to put a Async.Start or Async.RunSynchronously at the very outer edge of your program to actually run it, but it's better if you have the async (or convert it to a Task) and pass it to some other code (e.g. a web framework) that can call it in a non-blocking manner.

TheQuickBrownFox
  • 10,544
  • 1
  • 22
  • 35
  • Good point. Of cause it should only be on the very outer edge. But playing around with it I thought that the second and especially third code block should work but it doesn't... Just trying to understand why. – KCT Nov 16 '17 at 14:20
3

Not that I want to answer your question with another question, but: why are you doing code like this anyway? That might help to understand it. Why not just:

let asyncWorkflow = async {
    let! result = Stream.TryOpenAsync(partition) |> Async.AwaitTask 
    if result.Found then return openResult.Stream else return Stream(partition) }

There's little point in creating an async workflow only to immediately call RunSynchronously on it - it's similar to calling .Result on a Task - it just blocks the current thread until the workflow returns.

Fyodor Soikin
  • 78,590
  • 9
  • 125
  • 172
Isaac Abraham
  • 3,422
  • 2
  • 23
  • 26
  • Streamstone in version 2 is now returns Task instead of T so I had to change it. Starting with async in F# I want to learn a bit more about it an played around. For me it looks the same as your Listing 36.2. in your book just with a method returning Task that I have to convert. But something has to be different as it does not work and I try to figure out if Task has something to do with it. – KCT Nov 17 '17 at 06:49
  • The second question I got exactly from your code snipped in your answer. Why do I have to bind the result with let! to a result value and cannot pipe it into the next function using that result: Steam.TryOpenAsync(partition) |> Async.AwaitTask |> (fun result ->...) or at least ...Async.AwaitTask |> Async."Let!" |> (fun result ->...) – KCT Nov 17 '17 at 06:50
  • 1
    You could easily create a function that does this, probably called Async.map, which maps one Async result to another. This then avoids the need to have the async { } block completely e.g. https://gist.github.com/isaacabraham/91702cd88c5912e1a968a472de727421 is a simple version of this. – Isaac Abraham Nov 18 '17 at 09:36
  • let! is the F# equivalent of await i.e. asynchronously wait for the result and then continue afterwards (maybe on another thread). – Isaac Abraham Nov 18 '17 at 09:37
  • In your gist @isaac-abraham you also stay within an async workflow. My "Let!" implementation exits it. So I take with me so far: stay in (async) workflow and run it. Do not exit it. – KCT Nov 20 '17 at 07:40