1

I am using a library that provides a Traversable[T] that pages through database results. I'd like to avoid loading the whole thing into memory, so I am trying to convert it to a Stream[T].

From what I can tell, the built in "asStream" method loads the whole Traversable into a Buffer, which defeats my purpose. My attempt (below) hits a StackOverflowException on large results, and I can't tell why. Can someone help me understand what is going on? Thanks!

def asStream[T](traversable: => Traversable[T]): Stream[T] = {
  if (traversable.isEmpty) Empty
  else {
    lazy val head = traversable.head
    lazy val tail = asStream(traversable.tail)
    head #:: tail
  }
}

Here's a complete example that reproduces this, based on a suggestion by @SCouto

import scala.collection.immutable.Stream.Empty

object StreamTest {
  def main(args: Array[String]) = {
    val bigVector = Vector.fill(90000)(1)
    val optionStream = asStream(bigVector).map(v => Some(v))
    val zipped = optionStream.zipAll(optionStream.tail, None, None)
  }

  def asStream[T](traversable: => Traversable[T]): Stream[T] = {
    @annotation.tailrec
    def loop(processed: => Stream[T], pending: => Traversable[T]): Stream[T] = {
      if (pending.isEmpty) processed
      else {
        lazy val head = pending.head
        lazy val tail = pending.tail
        loop(processed :+ head, tail)
      }
    }

    loop(Empty, traversable)
  }
}

Edit: After some interesting ideas from @SCouto, I learned this could also be done with trampolines to keep the result as a Stream[T] that is in the original order

object StreamTest {
  def main(args: Array[String]) = {
    val bigVector = Range(1, 90000).toVector
    val optionStream = asStream(bigVector).map(v => Some(v))
    val zipped = optionStream.zipAll(optionStream.tail, None, None)
    zipped.take(10).foreach(println)
  }

  def asStream[T](traversable: => Traversable[T]): Stream[T] = {
    sealed trait Traversal[+R]
    case class More[+R](result: R, next: () => Traversal[R]) extends Traversal[R]
    case object Done extends Traversal[Nothing]

    def next(currentTraversable: Traversable[T]): Traversal[T] = {
      if (currentTraversable.isEmpty) Done
      else More(currentTraversable.head, () => next(currentTraversable.tail))
    }

    def trampoline[R](body: => Traversal[R]): Stream[R] = {
      def loop(thunk: () => Traversal[R]): Stream[R] = {
        thunk.apply match {
          case More(result, next) => Stream.cons(result, loop(next))
          case Done => Stream.empty
        }
      }
      loop(() => body)
    }

    trampoline(next(traversable))
  }
}
George Hilios
  • 199
  • 1
  • 9

2 Answers2

1

Try this:

  def asStream[T](traversable: => Traversable[T]): Stream[T] = {

    @annotation.tailrec
    def loop(processed: Stream[T], pending: Traversable[T]): Stream[T] = {
      if (pending.isEmpty) processed
      else {
        lazy val head = pending.head
        lazy val tail = pending.tail
        loop(head #:: processed, tail)
      }
    }

    loop(Empty, traversable)
  }

The main point is to ensure that your recursive call is the last action of your recursive function.

To ensure this you can use both a nested method (called loop in the example) and the tailrec annotation which ensures your method is tail-safe.

You can find info about tail rec here and in this awesome answer here

EDIT The problem was that we were adding the element at the end of the Stream. If you add it as head of the Stream as in your example it will work fine. I updated my code. Please test it and let us know the result.

My tests:

scala> val optionStream = asStream(Vector.fill(90000)(1)).map(v => Some(v))
optionStream: scala.collection.immutable.Stream[Some[Int]] = Stream(Some(1), ?)

scala> val zipped = optionStream.zipAll(optionStream.tail, None, None)
zipped: scala.collection.immutable.Stream[(Option[Int], Option[Int])] = Stream((Some(1),Some(1)), ?)

EDIT2:

According to your comments, and considering the fpinscala example as you said. I think this may help you. The point is creating a case class structure with lazy evaluation. Where the head is a single element, and the tail a traversable

sealed trait myStream[+T] {
  def head: Option[T] = this match {
    case MyEmpty => None
    case MyCons(h, _) => Some(h())
  }


  def tail: myStream[T] = this match {
      case MyEmpty => MyEmpty
      case MyCons(_, t) => myStream.cons(t().head, t().tail)
    }
}
case object MyEmpty extends myStream[Nothing]
case class MyCons[+T](h: () => T, t: () => Traversable[T]) extends myStream[T]


object myStream {

  def cons[T](hd: => T, tl: => Traversable[T]): myStream[T] = {
    lazy val head = hd
    lazy val tail = tl

    MyCons(() => head, () => tail)
  }

  def empty[T]: myStream[T] = MyEmpty

  def apply[T](as: T*): myStream[T] = {
    if (as.isEmpty) empty
    else cons(as.head, as.tail)
  }
}

Some Quick tests:

  val bigVector = Vector.fill(90000)(1)
myStream.cons(bigVector.head, bigVector.tail)
res2: myStream[Int] = MyCons(<function0>,<function0>)

Retrieving head:

res2.head
res3: Option[Int] = Some(1)

And the tail:

res2.tail
res4: myStream[Int] = MyCons(<function0>,<function0>)

EDIT3

The trampoline solution by the op:

 def asStream[T](traversable: => Traversable[T]): Stream[T] = {
    sealed trait Traversal[+R]
    case class More[+R](result: R, next: () => Traversal[R]) extends Traversal[R]
    case object Done extends Traversal[Nothing]

    def next(currentTraversable: Traversable[T]): Traversal[T] = {
      if (currentTraversable.isEmpty) Done
      else More(currentTraversable.head, () => next(currentTraversable.tail))
    }

    def trampoline[R](body: => Traversal[R]): Stream[R] = {
      def loop(thunk: () => Traversal[R]): Stream[R] = {
        thunk.apply match {
          case More(result, next) => Stream.cons(result, loop(next))
          case Done => Stream.empty
        }
      }
      loop(() => body)
    }

    trampoline(next(traversable))
      }
    }
SCouto
  • 7,808
  • 5
  • 32
  • 49
  • Interesting idea - I tried something similar. Your approach still seems to stack overflow though. Did this work for you? – George Hilios Jun 04 '18 at 14:12
  • Just made some quick tests. Can you post somehow your test dataset? – SCouto Jun 04 '18 at 14:14
  • I added a complete example on the post – George Hilios Jun 04 '18 at 14:27
  • I updated my answer, please check it and let us know the result – SCouto Jun 04 '18 at 14:53
  • Nice! Thanks! In hindsight, of course that would be the case. I think this is close - before I mark it as the accepted answer (in case what I want isn't possible), the stream seems to be a reversal of the original data. To me this implies that the original Traversable is being fully traversed before the Stream even starts. Am I interpreting that correctly? I added a foreach(println) and confirmed that the order is in reverse – George Hilios Jun 04 '18 at 17:00
  • Indeed. The resulting stream will be in reverse order. If you want in the proper order you can reverse the input traversable. asStream(Vector.fill(90000)(1).reverse) – SCouto Jun 04 '18 at 17:02
  • Well @Scouto, my point was a bit different - if my goal is to avoid traversing the whole Traversable (coming from Scalike JDBC) because doing so forces all of the pages to be loaded into memory, is what I'm trying to do even possible? I wonder if I need to implement something "Streamlike" that keeps a reference to Traversable as you pull from the Stream. – George Hilios Jun 04 '18 at 21:01
  • One other thought: perhaps this is a use case for a Generator whose state includes pending.tail? I think this was a topic in the fpinscala book – George Hilios Jun 04 '18 at 21:08
  • I understand your point. I made another edit creating a case class with lazy evaluation while keeping the tail as the original traversable. It's a bit rough, but i think it may help. – SCouto Jun 05 '18 at 06:02
  • Your answer looks solid - let me try it out, as well as something like a trampoline (https://stackoverflow.com/questions/2201882/implementing-yield-yield-return-using-scala-continuations/). Ideally this implements Stream[T] so all of the existing library methods (including map) work. – George Hilios Jun 07 '18 at 03:34
  • The trampoline worked! What do you think of the code I added to my original post? If you add it and your solution, I can mark this as the answer – George Hilios Jun 07 '18 at 03:57
  • Looks awesome and really scala-ish. I'll take note of that for future uses. – SCouto Jun 07 '18 at 05:53
-1

Stream doesn't keep the data in memory because you declare how to generate each item. It's very likely that your database data is not been procedurally generated so what you need is to fetch the data the first time you ask for it (something like def getData(index: Int): Future[Data]).

The biggest problem rise in, since you are fetching data from a database, you are probably using Futures so, even if you are able to achieve it, you would have a Future[Stream[Data]] object which is not that nice to use or, much worst, block it.

Wouldn't be much more worthy just to paginate your database data query?

RoberMP
  • 1,306
  • 11
  • 22
  • I don't think you're addressing @GeorgeHilios' question – Rajit Jun 04 '18 at 14:03
  • Some context: I am using another library for this (scalike jdbc) – George Hilios Jun 04 '18 at 14:17
  • I think that I do @Rajit, from `I'd like to avoid loading the whole thing into memory, so I am trying to convert it to a Stream[T].`. And I'm answering that this is not possible if you do it just adding the elements from a Tranversable. What you are doing is just change the data structure but the data is preserved in memory (it is not magically generated later). The `StackOverflowException` is not the point even if it's the final question. – RoberMP Jun 04 '18 at 14:29
  • Fair enough! FWIW my reasoning is that the title of the question is about Traverse and a stack overflow. And the question in the description explicitly asks about the stack overflow. But I totally understand where you're coming from – Rajit Jun 04 '18 at 14:31
  • @George Hilios, Then, probably you are fine blocking the future since the JDBC drivers are blocking anyway :). If you are interesting I could think in a way to implement it later (it's an interesting exercise). But I would rather suggest you to just paginate your data. – RoberMP Jun 04 '18 at 14:32