2

One of the stages of my computation graph is a flow of type Flow[Seq[Request], Seq[Response], NotUsed]. Obviously, this stage should assign a response to every request, and emit the seq once all of the requests are resolved.

Now, the underlying API has a harsh rate limiting policy, so I can only fire a single request per second. If I had a Flow of single Requests, I could zip this stream with a one that emits a single element per second (How to limit an Akka Stream to execute and send down one message only once per second?), but I don't see a similar solution in this case.

Is there a nice way to express this? The idea that comes to my mind is using the low level Graph DSL and having a one-second-tick stream as state there, and using it to process the sequences of the requests, but I doubt that it will turn out good-looking.

Community
  • 1
  • 1
roman-roman
  • 2,746
  • 19
  • 27

2 Answers2

2

As Victor said you should probably use default throttle. But in case you want to do it yourself it may look like this

private def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val ticker = Source.tick(rate, rate, Unit)

  val zip = builder.add(Zip[T, Unit.type])
  val map = Flow[(T, Unit.type)].map { case (value, _) => value }
  val messageExtractor = builder.add(map)

  ticker ~> zip.in1
  zip.out ~> messageExtractor.in

  FlowShape.of(zip.in0, messageExtractor.out)
})

// And it will be used in your flow as follows
// .via(throttleFlow(FiniteDuration(200, MILLISECONDS)))

Also since you're limiting access to some API you may want to limit calls to it in centralized fashion. Say you have multiple places in your project that make calls to the same external API but because calls come from same IP throttling should be applied to all of them. For such case consider using MergeHub.source for your (supposedly) akka-http flow. Each caller will create and execute new graph to make a call.

expert
  • 29,290
  • 30
  • 110
  • 214
  • The problem I face is that this will limit the flow of `Seq[Request]` requests, but the rate limit applies to each `Request`. I.e. even if limit the `Seq[Request]` flow to 1 per second, I'll fire the requests more frequent. I considered flattening the stream, but it's hard to aggregate them again, and it doesn't feel right. – roman-roman Nov 28 '16 at 12:37
2

Here is what I'm ending up using:

  case class FlowItem[I](i: I, requests: Seq[HttpRequest], responses: Seq[String]) {
    def withResponse(resp: String) = copy(responses = resp +: responses)
    def extractNextRequest = (requests.head, copy(requests = requests.tail))
  }


 def apiFlow[I, O](requestPer: FiniteDuration,
                    buildRequests: I => Seq[HttpRequest],
                    buildOut: (I, Seq[String]) => O
                   )(implicit system: ActorSystem, materializer: ActorMaterializer) = {
    GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val in: FlowShape[I, FlowItem[I]] =
        b.add(Flow[I].map(i => FlowItem(i, buildRequests(i), Seq.empty)))

      val merge: MergePreferredShape[FlowItem[I]] =
        b.add(MergePreferred[FlowItem[I]](1))

      val throttle: FlowShape[FlowItem[I], FlowItem[I]] =
        b.add(Flow[FlowItem[I]].throttle(1, requestPer, 1, ThrottleMode.shaping))

      val prepareRequest: FlowShape[FlowItem[I], (HttpRequest, FlowItem[I])] =
        b.add(Flow[FlowItem[I]].map(_.extractNextRequest))

      val log =
        b.add(Flow[(HttpRequest, FlowItem[I])].map { r => Console.println(s"rquest to ${r._1.uri}"); r})

      val pool: FlowShape[(HttpRequest, FlowItem[I]), (Try[HttpResponse], FlowItem[I])] =
        b.add(Http(system).superPool[FlowItem[I]]())

      val transformResponse: FlowShape[(Try[HttpResponse], FlowItem[I]), FlowItem[I]] =
        b.add(Flow[(Try[HttpResponse], FlowItem[I])].mapAsync(1) {
          case (Success(HttpResponse(StatusCodes.OK, headers, entity, _)), flowItem) =>
            entity.toStrict(1.second).map(resp => flowItem.withResponse(resp.data.utf8String))
        })

      val split: UniformFanOutShape[FlowItem[I], FlowItem[I]] =
        b.add(Partition[FlowItem[I]](2, fi => if (fi.requests.isEmpty) 0 else 1))


      val out: FlowShape[FlowItem[I], O] =
        b.add(Flow[FlowItem[I]].map(fi => buildOut(fi.i, fi.responses)))

        in ~> merge ~> throttle ~> prepareRequest ~> log ~> pool ~> transformResponse ~> split ~> out
              merge.preferred   <~                                                       split

      FlowShape(in.in, out.out)
    }
  }

The idea is to pass the elements throw the throttle as many times as there are requests, and store the additional (not yet performed) requests along with the messages. The split element checks if there are more requests left.

roman-roman
  • 2,746
  • 19
  • 27