1

Build on my earlier question, and with insights from Artem, my objective is to send get requests to a given url, and use Monix's throttling feature to space out the requests (to avoid hitting rate limits).

The expected workflow looks something like:

make 1 (or more) api call(s) -> apply back-pressure/pausing (based on throttle) -> make the next request -> so on and so forth..

This is what I have tried so far (below is a simplified snippet of my actual code):

import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt

object ObservableTest extends App {

  val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val ids: Task[List[Int]] = Task { (1 to 3).toList }
    val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
    val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
    data.guarantee(backend.close())
  }

  import monix.execution.Scheduler.Implicits.global

  val flat: Unit = activities.runToFuture.foreach { x =>
    val r: List[Task[Response[Either[String, String]]]] = x // List with size 3
    Observable
      .fromIterable(r)
      .throttle(6 second, 1)
      .map(_.runToFuture)
      .subscribe()
  }
  while (true) {}
}
  

And this is how the function for fetching the data looks like:

  def fetch(uri: Uri, auth: String)(implicit
      backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
  ) = {
    println(uri)
    val task = basicRequest
      .get(uri)
      .header("accept", "application/json")
      .header("Authorization", auth)
      .response(asString)
      .send()

    task
  }

I have tried running the aforementioned code and I still see that all the get requests are fired without any spacing in between.

For illustration, my current api call logs look something like:

//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)

And I am trying to achieve something similar to:

//(https://mr.foos.api/v1), Sat Aug 08 18:50:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:18 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:21 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:24 CEST 2020)

Update:

  • I have set up the api to be mockable using Beeceptor. And it seems to me that the print statements are made from the calling function but the requests are not actually sent. I have also updated my function call to be parsing as a string (just for simplicity)However, when I try to throttle the request to the mock api it still does not receive any requests.
alt-f4
  • 2,112
  • 17
  • 49
  • 1
    This might be related (but I'm not sure): it seems like your code is throttling to one request every 3 seconds when you want to throttle to one request every three *minutes*. Maybe, because the throttle value is relatively low, they still end up getting batched together, somehow. – Julia Aug 08 '20 at 17:13
  • @Julia hello Julia. I actually just put the minutes only for illustration, but you are right that is confusing to read in the question. I have edited it to be in seconds – alt-f4 Aug 08 '20 at 17:16
  • Oh, okay. I wish that were it. – Julia Aug 08 '20 at 17:17

1 Answers1

2

So if i have understood right you have types like this:

object ObservableTest extends App  {
  type Response = Either[ResponseError[Error], Activities]
  case class Activities()
  val activities: Task[List[Response]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    def fetchData(uri: String): Task[Response] = ???
    val ids: Task[List[Int]] = ??? // a Task containing a List of IDs (from a previous step)
    val func: String => Task[Response] = (i: String) => fetchData("someUri") // a function that will be used to call an endpoint
    val data: Task[List[Task[Response]]] = ids map (_ map (id => func(id.toString))) // Maps API calling-function to the ids
    val activitiesData: Task[List[Response]] = data.flatMap(Task.parSequenceUnordered(_)) // Flattenned the previous step
    activitiesData.guarantee(backend.close())
  }
  import monix.execution.Scheduler.Implicits.global
  Observable(activities)
    .throttle(3 second, 1)
    .subscribe()
}

The problem in your code that you throttle the one big Task that contains multiple actions, some of them even parallel (but that not is the root of the problem). Even in types, you can see that - you should make observable from a list of tasks (each of them would be throttled), not the task of the list.

I actually don't know where ids come from and it can be the cornerstone of the evaluation pipeline. But if we have the task with them like in the example. We will do this.

import monix.eval.Task
import sttp.client.asynchttpclient.monix._
import monix.eval.Task._
import monix.reactive.Observable
import sttp.client.ResponseError

import scala.concurrent.duration.DurationInt

object ObservableTest extends App  {
  type Response = Either[ResponseError[Error], Activity]
  case class Activity()
  val activities: Task[List[Task[Response]]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    def fetchData(uri: String): Task[Response] = Task {
      println("mocked http request")
      Right(Activity())
    }
    val ids: Task[List[Int]] = Task { (1 to 100).toList} // a Task containing a List of IDs (from a previous step)
    val func: Int => Task[Response] = (i: Int) => fetchData(s"someUri_$i") // a function that will be used to call an endpoint
    val data: Task[List[Task[Response]]] = ids.map(_.map(func)) // Maps API calling-function to the ids
    data.guarantee(backend.close())
  }
  import monix.execution.Scheduler.Implicits.global

  Observable.fromTask(activities)
    .flatMap { listOfFetches: List[Task[Response]]  =>
      Observable.fromIterable(listOfFetches)
    }
    .throttle(3.second, 1)
    .map(_.runToFuture) 
    .subscribe()
  
  while(true) {}
}

We throttle a list of fetches, not the task that does all fetches inside.

PS: Please ask questions what is unclear, I will add comments to the code

Artem Sokolov
  • 810
  • 4
  • 8
  • Hello again Artem! Both your code and approach make sense to me, and also works for the simplified code you have provided. I have tried to adapt it, but still I do not see any throttling. I am starting to doubt that the issue maybe I lies in the function I use to call the API? I have added more parts of my code, and I hope that helps. Please let me know if anything else I can provide – alt-f4 Aug 08 '20 at 22:41
  • I provided an update to my post. I have edited my code a bit to use a mockable api, I hope that makes it easier to run my code. I have also noticed that the print statements seem to execute from the caller but the endpoint does not receive the request. Let me know if I can provide further information – alt-f4 Aug 09 '20 at 06:10
  • 1
    Your code is correct and throttling is actually works. The problem that your println is side effect and it is not composed in the task that you trying to log. So your code just prints when the task is created, but not when it evaluated... – Artem Sokolov Aug 09 '20 at 09:19
  • That seems to be the case, but when I tried testing with beeceptor, it seems that the api received no requests. Not sure if the way I build the observable is correct. Or maybe I need to do something after running it to a future to actually fire the request? – alt-f4 Aug 09 '20 at 13:14
  • I believe the problem is in `data.guarantee(backend.close())` the backend is closed before the requests are made... I will update further when I have more evidence on that. Thanks for the help so far! – alt-f4 Aug 09 '20 at 16:57