1

I'm reading a very large number of records sequentially from database API one page at a time (with unknown number of records per page) via call to def readPage(pageNumber: Int): Iterator[Record]

I'm trying to wrap this API in something like either Stream[Iterator[Record]] or Iterator[Iterator[Record]] lazily, in a functional way, ideally no mutable state, with constant memory footprint, so that I can treat it as infinite stream of pages or sequence of Iterators, and abstract away the pagination from the client. Client can iterate on the result, by calling next() it will retrieve the next page (Iterator[Record]).

What is the most idiomatic and efficient way to implement this in Scala.

Edit: need to fetch & process the records one page at a time, cannot maintain all the records from all pages in memory. If one page fails, throw an exception. Large number of pages/records means infinite for all practical purposes. I want to treat it as infinite stream (or iterator) of pages with each page being an iterator for finite number of records (e.g. less <1000 but exact number is unknown ahead if time).

I looked at BatchCursor in Monix but it serves a different purpose.

Edit 2: this is the current version using Tomer's answer below as starting point, but using Stream instead of Iterator. This allows to eliminate the need in tail recursion as per https://stackoverflow.com/a/10525539/165130, and have O(1) time for stream prepend #:: operation (while if we've concatenated iterators via ++ operation it would be O(n))

Note: While streams are lazily evaluated, Stream memoization may still cause memory blow up, and memory management gets tricky. Changing from val to def to define the Stream in def pages = readAllPages below doesn't seem to have any effect

def readAllPages(pageNumber: Int = 0): Stream[Iterator[Record]] = {
   val iter: Iterator[Record] = readPage(pageNumber)
   if (iter.isEmpty)
     Stream.empty
   else
    iter #:: readAllPages(pageNumber + 1)
} 
      
//usage
val pages = readAllPages
for{
    page<-pages
    record<-page
    if(isValid(record))
}
process(record)
 

Edit 3: the second suggestion by Tomer seems to be the best, its runtime and memory footprint is similar to the above solution but it is much more concise and error-prone.

val pages = Stream.from(1).map(readPage).takeWhile(_.nonEmpty)

Note: Stream.from(1) creates a stream starting from 1 and incrementing by 1, it's in API docs

alex
  • 1,757
  • 4
  • 21
  • 32
  • 2
    The answer is, depends... What do you want to achieve? Do you want all of the pages to be read at the same time? One by one? What happens if one page fails? Do you start over? Retry only this page? What is a large number of records? Can all of the records exist in the memory of your machine all together? If not, you have to make sure to finish one until you get the next one. – Tomer Shetah Jan 17 '21 at 10:03
  • I want to fetch and process the records one page at a time, cannot maintain all the pages in memory. If one page fails, throw an exception. By large number of records meaning infinite to all practical purposes, I want to treat it as infinite stream. – alex Jan 17 '21 at 10:08
  • 2
    Streaming is a solution only if your client can be connected to the server indefinitely. Then you would either stream everything through one response, or use e.g. websockets to respond with next batch of results when client wants them. If you cannot have one connection to the server and you used pagination, then you would have to store state in your server, namely a DB connection with a cursor. Which means that you cannot handle too many clients. Which is why usually pagination equals separate DB requests with LIMIT and OFFSET (where results can change between queries). – Mateusz Kubuszok Jan 17 '21 at 11:13
  • 1
    So what you want to use mostly depends on how your API consumers would have to use it. – Mateusz Kubuszok Jan 17 '21 at 11:15
  • 2
    For just creating the Stream you could use the `unfold` method in either **Stream**, **LazyList**, or **Iterator**. But, as others have point out, returning this on an REST API depends on your framework. Also, exposing an iterator is usually a bad idea since if you consume it by mistake you will produce errors. Finally, the `unfold` solution would not handle errors, if you really want a more functional API it would be good to return an `Either` or an `Option` of a `List` for each page. – Luis Miguel Mejía Suárez Jan 17 '21 at 14:59

1 Answers1

2

You can try implement such logic:

def readPage(pageNumber: Int): Iterator[Record] = ???

@tailrec
def readAllPages(pageNumber: Int): Iterator[Iterator[Record]] = {
  val iter = readPage(pageNumber)
  if (iter.nonEmpty) {
    // Compute on records
    // When finishing computing:
    Iterator(iter) ++ readAllPages(pageNumber + 1)
  } else {
    Iterator.empty
  }
}

readAllPages(0)

A shorter option will be:

Stream.from(1).map(readPage).takeWhile(_.nonEmpty)
Tomer Shetah
  • 8,413
  • 7
  • 27
  • 35
  • I like it for simplicity, the only thing, it needs to return Iterator[Iterator[Record]] instead of Unit so that client can iterate on the result, by calling next it will retrieve the next page – alex Jan 17 '21 at 17:55
  • thanks, btw your first solution is not tail recursive ("Recursive call is not in tail position"), it can be made tail recursive by adding a buffer-like argument, but it's better to use Stream as it eliminates the need in tail recursion altogether, as I've just learned from this answer https://stackoverflow.com/a/10525539/165130 – alex Jan 17 '21 at 19:54
  • yes, it helped, i'm going to fix it and post – alex Jan 17 '21 at 19:58
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/227461/discussion-between-alex-and-tomer-shetah). – alex Jan 17 '21 at 20:23
  • thanks! accepted the second suggestion as it seems to be the best, edited my question above – alex Jan 18 '21 at 02:37