1

I have some computation intensive tasks, which are now only running on 1 core, so 1/8th of my machine capacity. At the end of each task, I write a log in a file.

What would be the most graceful way to handle this IO using parallel tasks ?

Having my write be itself async ?

Sending messages to an agent who'd process the write sequentially ?

[<Fact>]
let boom () = 
    let tasks = [1 .. 10]
                |> Seq.map (fun components ->  async {  //do compute intensive stuff
                                                        use writer = new StreamWriter("samefile")
                                                        writer.WriteLine "toto" }
           )
    tasks |> Async.Parallel  |> Async.RunSynchronously

Edit

I ended up doing this, and replacing the new Stream in my async to be code by synchronous call to the agent.

[<Fact>]
let pasBoom () = 
    let tasks = [2 .. 2 .. 17]
                |> Seq.map (fun components ->  async {  //do compute intensive stuff
                                                        //use writer = new StreamWriter("samefile")
                                                        use writerhanlde = repoFileHandle.PostAndReply(fun replyChannel -> GetFile(@"samefile", replyChannel))  
                                                        printfn "%A" (writerhanlde.getWriter().ToString())
                                                        writerhanlde.getWriter().WriteLine "toto" }
           )
    tasks |> Async.Parallel  |> Async.RunSynchronously

and the agent (there might be bugs please be careful, I just need something quick myself)

type IDisposableWriter = 
    inherit IDisposable
    abstract getWriter : unit -> StreamWriter


type StreamMessage = | GetFile of string * AsyncReplyChannel<IDisposableWriter>

let repoFileHandle =
    let writerCount = new Dictionary<string, int>()
    let writerRepo  = new Dictionary<string, StreamWriter> ()

    Agent.Start(fun inbox ->
        async { while true do
                    let! msg = inbox.Receive()
                    match msg with
                    | GetFile(filename, reply) -> 
                        if not (writerRepo.ContainsKey(filename)) then
                            writerRepo.[filename]  <- new StreamWriter(filename,true)
                            writerCount.[filename] <- 0
                        writerCount.[filename] <- writerCount.[filename] + 1

                        let obj = {new IDisposableWriter with 
                                    member this.getWriter () = writerRepo.[filename] 
                                    member IDisposable.Dispose() =  
                                        writerCount.[filename] <- writerCount.[filename] - 1                                                                        
                                        if writerCount.[filename] = 0 then
                                            writerRepo.[filename].Dispose()
                                            writerRepo.Remove(filename) |> ignore
                                }
                        reply.Reply(obj) })

and to avoid concurrent write

  type WriteToStreamMessage = | WriteToStream of string * string

  let fileWriterAgent =
        Agent.Start(fun inbox ->
            async { while true do
                        let! msg = inbox.Receive()
                        match msg with
                        | WriteToStream(filename, content) -> 
                            use writerhanlde = repoFileHandle.PostAndReply(fun replyChannel -> GetFile(filename, replyChannel))
                            writerhanlde.getWriter().WriteLine content
    })
nicolas
  • 9,549
  • 3
  • 39
  • 83
  • Are you aware of [the differences between TPL and async](http://stackoverflow.com/a/1871643/162396)? – Daniel Mar 05 '13 at 15:01
  • my phrasing is ambiguous. I meant async, which would just compose continuations. I dont know so much about TPL – nicolas Mar 05 '13 at 15:02
  • wouldn't async help distribute on the different cores as well ? – nicolas Mar 05 '13 at 15:04
  • Can you include your code? – Daniel Mar 05 '13 at 15:22
  • Here it is. there are some race condition for the IO ressource. I guess an agent would be an answer, but I dont remember all the options available. – nicolas Mar 05 '13 at 15:32
  • I guess if I acquire the stream outside and then I writeasync to it, that'll do, but that means changing the code quite deeply as the write is in some object hierarchy etc... may be I should change the 'use' to some 'stream provider agent which generate IDisposable, and keep an internal count of file use, and disposes it when at 0. – nicolas Mar 05 '13 at 15:39

1 Answers1

1

Can you change your computation to return the message to be logged instead of writing it to a file? Then you could use PSeq in PowerPack, which is a thin wrapper over TPL:

open Microsoft.FSharp.Collections

let work n = sprintf "running task %d" n
let msgs = PSeq.init 10 work |> PSeq.toList
use writer = System.IO.StreamWriter(@"C:\out.log")
msgs |> List.iter writer.WriteLine
Daniel
  • 47,404
  • 11
  • 101
  • 179
  • unfortunately I have quite a few places where I write in different files, so I am looking for a moire generic way. thanks for the idea though – nicolas Mar 05 '13 at 16:26
  • IO is not synchronized, so you might offload it to a "queued IO agent" so writes are serialized. – Daniel Mar 05 '13 at 17:36