I'm building an application where I take a request from a user, call a REST API to get back some data, then based on that response, make another HTTP call and so on. Basically, I'm processing a tree of data where each node in the tree requires me to recursively call this API, like this:
A
/ \
B C
/ \ \
D E F
I'm using Akka HTTP with Akka Streams to build the application, so I'm using it's streaming API, like this:
val httpFlow = Http().cachedConnection(host = "localhost")
val flow = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Data](2))
val bcast = b.add(Broadcast[ResponseData](2))
takeUserData ~> merge ~> createRequest ~> httpFlow ~> processResponse ~> bcast
merge <~ extractSubtree <~ bcast
FlowShape(takeUserData.in, bcast.out(1))
}
I understand that the best way to handle recursion in an Akka Streams application is to handle recursion outside of the stream, but since I'm recursively calling the HTTP flow to get each subtree of data, I wanted to make sure that the flow was properly backpressured in case the API becomes overloaded.
The problem is that this stream never completes. If I hook it up to a simple source like this:
val source = Source.single(data)
val sink = Sink.seq[ResponseData]
source.via(flow).runWith(sink)
It prints out that it's processing all the data in the tree and then stops printing anything, just idling forever.
I read the documentation about cycles and the suggestion was to put a MergePreferred
in there, but that didn't seem to help. This question helped, but I don't understand why MergePreferred
wouldn't stop the deadlock, since unlike their example, the elements are removed from the stream at each level of the tree.
Why doesn't MergePreferred
avoid the deadlock, and is there another way of doing this?