Here is what I'm ending up using:
case class FlowItem[I](i: I, requests: Seq[HttpRequest], responses: Seq[String]) {
def withResponse(resp: String) = copy(responses = resp +: responses)
def extractNextRequest = (requests.head, copy(requests = requests.tail))
}
def apiFlow[I, O](requestPer: FiniteDuration,
buildRequests: I => Seq[HttpRequest],
buildOut: (I, Seq[String]) => O
)(implicit system: ActorSystem, materializer: ActorMaterializer) = {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val in: FlowShape[I, FlowItem[I]] =
b.add(Flow[I].map(i => FlowItem(i, buildRequests(i), Seq.empty)))
val merge: MergePreferredShape[FlowItem[I]] =
b.add(MergePreferred[FlowItem[I]](1))
val throttle: FlowShape[FlowItem[I], FlowItem[I]] =
b.add(Flow[FlowItem[I]].throttle(1, requestPer, 1, ThrottleMode.shaping))
val prepareRequest: FlowShape[FlowItem[I], (HttpRequest, FlowItem[I])] =
b.add(Flow[FlowItem[I]].map(_.extractNextRequest))
val log =
b.add(Flow[(HttpRequest, FlowItem[I])].map { r => Console.println(s"rquest to ${r._1.uri}"); r})
val pool: FlowShape[(HttpRequest, FlowItem[I]), (Try[HttpResponse], FlowItem[I])] =
b.add(Http(system).superPool[FlowItem[I]]())
val transformResponse: FlowShape[(Try[HttpResponse], FlowItem[I]), FlowItem[I]] =
b.add(Flow[(Try[HttpResponse], FlowItem[I])].mapAsync(1) {
case (Success(HttpResponse(StatusCodes.OK, headers, entity, _)), flowItem) =>
entity.toStrict(1.second).map(resp => flowItem.withResponse(resp.data.utf8String))
})
val split: UniformFanOutShape[FlowItem[I], FlowItem[I]] =
b.add(Partition[FlowItem[I]](2, fi => if (fi.requests.isEmpty) 0 else 1))
val out: FlowShape[FlowItem[I], O] =
b.add(Flow[FlowItem[I]].map(fi => buildOut(fi.i, fi.responses)))
in ~> merge ~> throttle ~> prepareRequest ~> log ~> pool ~> transformResponse ~> split ~> out
merge.preferred <~ split
FlowShape(in.in, out.out)
}
}
The idea is to pass the elements throw the throttle as many times as there are requests, and store the additional (not yet performed) requests along with the messages. The split
element checks if there are more requests left.