0

When I use Giraffe or ASP.Net Core in general, I can create an actor system, add it as a service and then get It thought the request handler select any actor and ask/tell a message.

Either using Cluster.Sharding or a normal user/actor I know it will be a single instance of the actor in the whole system processing multiple messages.

How can I do the same communication with Streams? They don’t seem to be references in the router, or the actor system as the actor paths: Actor References, Path and Addresses.

Should this be done differently?

Copying from the IO section, I could materialize one graph to handle each request, but in general I communicate with “Singletons” like Domain Driven Design Aggregate Roots to handle the domain logic (thats why the sharding module), I’m not sure how to do Singleton Sinks that can be used in the newly materialized graph in the request handler, as there must be only one sink for all the requests.

Coding Edgar
  • 1,285
  • 1
  • 8
  • 22

1 Answers1

2

There are many ways to integrate akka streams with external systems. The one that makes it easy recipient would be Source.queue (somewhat similar to System.Threading.Channels and predating them). You can materialize your stream at initialization point and then register queue endpoints in Giraffe DI - this way you don't pay cost of the same stream initialization on every request:

open Akka.Streams
open Akkling
open Akkling.Streams
open FSharp.Control.Tasks.Builders

let run () = task {
    use sys = System.create "sys" <| Configuration.defaultConfig()
    use mat = sys.Materializer()
    
    // construct a stream with async queue on both ends with buffer for 10 elements
    let sender, receiver =
        Source.queue OverflowStrategy.Backpressure 10
        |> Source.map (fun x -> x * x)
        |> Source.toMat (Sink.queue) Keep.both
        |> Graph.run mat
        
    // send data to a queue - quite often result could be just ignored
    match! sender.OfferAsync 2 with
    | :? QueueOfferResult.Enqueued -> () // successfull
    | :? QueueOfferResult.Dropped -> () // doesn't happen in OverflowStrategy.Backpressure 
    | :? QueueOfferResult.QueueClosed -> () // queue has been already closed
    | :? QueueOfferResult.Failure as f -> eprintfn "Unexpected failure: %O" f.Cause
    
    // try to receive data from the queue
    match! receiver.AsyncPull() with
    | Some data -> printfn "Received: %i" data
    | None -> printfn "Stream has been prematurelly closed"
        
    // asynchronously close the queue
    sender.Complete()
    do! sender.WatchCompletionAsync()
}
Bartosz Sypytkowski
  • 7,463
  • 19
  • 36