5

I have a paginated resource and I want to consume it recursively with Monix. I want to have an Observable that is going to emit downloaded elements and recursively consume pages. Here is a simple example. It doesn't work of course. It emits first page, then first page + second page, then first + second + third. And I want it to emit first, then second, then third and so on.

object Main extends App {

  sealed trait Event
  case class Loaded(xs: Seq[String]) extends Event
  // probably should just finish stream instead of this event
  case object Done extends Event

  // here is the problem
  def consume(page: Int, size: Int):Observable[Event] = {
    Observable.fromFuture(getPaginatedResource(page, size)).concatMap{ xs =>
      if (xs.isEmpty) Observable.pure(Done)
      else Observable.concat(Observable.pure(Loaded(xs)), consume(page + 1, size + 5))
    }
  }

  def getPaginatedResource(page: Int, size: Int):Future[Seq[String]] = Future {
    if (page * size > 100) Seq.empty
    else 0 to size map (x => s"element $x")
  }

  consume(page = 0, size = 5).foreach(println)

}

Any ideas?

UPD Sorry, it seems like it's working and I just have a bug size + 5. So it seems like problem is solved, but if you see that I'm doing something wrong, please, tell me.

Artem Malinko
  • 1,761
  • 1
  • 22
  • 39

1 Answers1

1

It's generally recommended to avoid recursion if possible whenever using Observable. Since, its not easy to visualize and is generally more prone to error.

One idea would be to use scanEvalF since it will emit items on each step.

sealed trait Event
object Event {
  case class Loaded(page: Int, size: Int, items: Seq[String]) extends Event
}

def getPaginatedResource(page: Int, size: Int): Task[Loaded] = Task.pure {
  if (page * size > 100) Loaded(page, size, Seq.empty)
  else Loaded(page, size, 0.to(size).map(x => s"element $x"))
}

def consume(page: Int, size: Int): Observable[Event] = {
  Observable
    .interval(0.seconds)
    .scanEvalF(getPaginatedResource(page, size)) { (xs, _) =>
      getPaginatedResource(xs.page + 1, xs.size + 5)
    } // will emit items on each step
    .takeWhileInclusive(_.items.nonEmpty) // only take until list is empty
}

consume(0, 5)
  .foreachL(println)
  .runToFuture
atl
  • 326
  • 1
  • 9