I have a paginated resource and I want to consume it recursively with Monix. I want to have an Observable that is going to emit downloaded elements and recursively consume pages. Here is a simple example. It doesn't work of course. It emits first page, then first page + second page, then first + second + third. And I want it to emit first, then second, then third and so on.
object Main extends App {
sealed trait Event
case class Loaded(xs: Seq[String]) extends Event
// probably should just finish stream instead of this event
case object Done extends Event
// here is the problem
def consume(page: Int, size: Int):Observable[Event] = {
Observable.fromFuture(getPaginatedResource(page, size)).concatMap{ xs =>
if (xs.isEmpty) Observable.pure(Done)
else Observable.concat(Observable.pure(Loaded(xs)), consume(page + 1, size + 5))
}
}
def getPaginatedResource(page: Int, size: Int):Future[Seq[String]] = Future {
if (page * size > 100) Seq.empty
else 0 to size map (x => s"element $x")
}
consume(page = 0, size = 5).foreach(println)
}
Any ideas?
UPD
Sorry, it seems like it's working and I just have a bug size + 5
. So it seems like problem is solved, but if you see that I'm doing something wrong, please, tell me.