15

Basically I want to convert this:

def data(block: T => Unit)

to a Stream (dataToStream is a hypothetical function that do this conversion):

val dataStream: Stream[T] = dataToStream(data)

I suppose this problem could be resolved by continuations:

// let's assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }

// here we can print all data integers
data { i => println(i) }

// >> but what we really want is to convert data to the stream <<

// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream

// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }

// and here a black magic of continuations must be used
// for me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?

Thanks, Dawid

Dawid Grzesiak
  • 402
  • 4
  • 9
  • Bounty for threadless answers, or convincing argument that there aren't any. – Dave Griffith Sep 28 '10 at 18:27
  • 1
    Your "block" produces no value. How can that be turned into a stream? Unit is a singleton. – Randall Schulz Sep 28 '10 at 18:38
  • The stream desired is the series of arguments that are being sent to "block", not the results of those calls. – Dave Griffith Sep 28 '10 at 19:02
  • 1
    Why do you require a Stream? Any special reason? Traversable or TraversableView give you a lot of flexibility. map, flatMap, filter, etc. are lazy. It uses exceptions to to prevent every call to "block" when calling methods like take. All in-all, the need for Stream seems frivolous here, and does necessitate either (A) using threads to be able to swap stack back and forth between the "data" function and the stream iteration. or (B) buffering all the values and creating a Stream from this buffer. This is more a matter of what tools you have on the JVM although i'd love to be surprised – jsuereth Sep 29 '10 at 00:54
  • It was just example. I don't care if I will end up with Stream, Iterator or Traversable. The essence is to convert data generator to lazy, memory and CPU efficient "stream of data". – Dawid Grzesiak Sep 29 '10 at 20:42
  • @Randall Schulz => Converting block that produces a value to Stream is easy task. See http://gist.github.com/603527 – Dawid Grzesiak Sep 29 '10 at 20:46

4 Answers4

11

EDITED: Modified the examples to show the laziness of traversable.view

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {    
     |   println("Generating " + i)
     |   f(i)
     | }
data: (f: (Int) => Unit)Unit

scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
     |   def foreach[X]( f : T => X) = func(f(_) : Unit)                       
     | }                                                                       
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]

The toTraversable method will convert your data function into a Traversable collection. By itself, it's nothing huge, but you can convert this to a TraversableView which is lazy. Here's an example:

scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6

The unfortunate nature of the take method is that it must go one past the last value generated to work correctly, but it will terminate early. The above code would look the same without the ".view" call. However, here's a more compelling example:

scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3

So in conclusion, I believe the collection you're looking for is TraversableView, which is easiest to create view making a Traversable and then calling "view" on it. If you really wanted the Stream type, here's a method that works in 2.8.0.final and will make a "Stream" without threads:

scala> def dataToStream( data : (Int => Unit) => Unit) = {
     |   val x = new Traversable[Int] {                     
     |     def foreach[U](f : Int => U) = {                 
     |        data( f(_) : Unit)                            
     |     }
     |   }
     |   x.view.toList.toStream                             
     | }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]

scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)

The unfortunate nature of this method is that it will iterate over the entire traversable before making the stream. This also means all the values need to be buffered in memory. The only alternative is to resort to threads.

As an aside: This was the motivating reason to prefer Traversables as direct returns from scalax.io.File methods: "lines" "chars" and "bytes".

jsuereth
  • 5,604
  • 40
  • 40
  • As you see, data is first evaluated and then converted to the Stream. So there is no laziness here. – Dawid Grzesiak Sep 29 '10 at 18:46
  • My point is that you can interact with the data as a "stream" if you use TraversableView. By requiring the type "Stream" you're limiting yourself. TraversableView *is* lazy. – jsuereth Sep 29 '10 at 20:54
  • If traversable view doesn't look lazy in the REPL it's because the REPL calls "toString" on resulting expressions and this will cause the TraversableView to traverse the entire collection (displaying all the values). If you develop a function using TraversableView you will see its lazyness. – jsuereth Sep 29 '10 at 20:56
  • Hmm, it is really not a bad idea. Sometimes this solution will be sufficient (especially when you want to traverse all data in a row) and sometimes not. See http://gist.github.com/603569 Ideally the last example output should be interlaced as well. It is a pity that you can't make a Stream or Iterator for it or you can, but it will evaluate all data first. If you have a Stream/Iterator, you can use two or more data streams parallely. For example take(3) from that take(10) from the other Iterator. Anyway it is a great and helpful piece of code! – Dawid Grzesiak Sep 29 '10 at 21:29
  • Using threads, when you didn't consume all data, thread will be not stopped but suspended. So it has drawbacks too... – Dawid Grzesiak Sep 29 '10 at 21:48
  • You forgot to call .view on the traversable. This will make it lazy and interleave the results. Without calling .view, all methods on collections are "eager" and will generate intermediate collections. In this case, your call to take is executing immediately. First call view then take. – jsuereth Sep 29 '10 at 23:24
  • @jsuereth On general cases I sometimes wonder if I should call .view or .toStream method. Which one is more efficient? I see that .view result is strictly binded with the subject, for example Traversable and TraversableView. So all *View classes must be prepared in advance, which is done by Scala creators. – Dawid Grzesiak Sep 30 '10 at 08:31
  • I would avoid calling toStream unless you are sure you can fit the entire collection in memory and wish to do so. – jsuereth Sep 30 '10 at 11:43
  • My question was apart of presented solution. I mean what's the pros and cons of calling .view and .toStream in general cases. Streams are lazy and memory efficient. – Dawid Grzesiak Sep 30 '10 at 12:32
  • Streams are lazy and memory efficient, however the default implementation of toStream must buffer the entire collections before creating a stream. The Iterable interface *can* make toStream be appropriately lazy. Most collections extend Iterable, so in that case you are right, they are lazy. Streams however are not as memory efficient as you might think. They memoize-> That is they retain previously generated values. In this case view is more memory efficient in that it does not retain generated values. – jsuereth Sep 30 '10 at 14:05
3

Here's a simple solution that spawns a thread that consumes the data. It posts the data to a SynchronousQueue. A stream the pulls data from the queue is created and returned:

 def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
  val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
  val callbackthread = new Runnable {
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
  }   
  new Thread(callbackthread).start()
  Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}   
Geoff Reedy
  • 34,891
  • 3
  • 56
  • 79
  • Because of CPS limitations this may be the only solution for Scala up to v2.8. Unfortunately it is 170x slower than using pure generator. See https://gist.github.com/a79c0a9669eea3d47eee – Dawid Grzesiak Sep 27 '10 at 19:17
2

I still have to figure out how to do that myself. I suspect the answer lies somewhere here:

Edit: removed code that showed how to solved a different problem.

Edit2: Using the code http://gist.github.com/580157 that was initially posted http://gist.github.com/574873, you can do this:

object Main {
  import Generator._

  def data = generator[Int] { yld =>
    for (i <- suspendable(List.range(0, 11))) yld(i)
  }

  def main(args: Array[String]) {
    for( i <- data.toStream ) println(i)
  }
}

data does not take a block code, but I think this is fine because with the continuation, block can be handled by the caller. The code for Generator can be seen in the gist on github.

huynhjl
  • 41,520
  • 14
  • 105
  • 158
  • Ehrm, didn't you solve a totally different problem than that of the OP? The OP's `data` function called the `block` function ten times and he wanted to turn that into a stream of ten elements. Your `data` function only calls `block` once. – sepp2k Sep 26 '10 at 16:58
  • @sepp2k, err, yes indeed. I guess continuation are necessary, then. – huynhjl Sep 26 '10 at 17:27
  • I tried to use code from this thread http://stackoverflow.com/questions/2201882/implementing-yield-yield-return-using-scala-continuations/3758084 but without success – Dawid Grzesiak Sep 26 '10 at 20:14
  • Yes, I tried that before. Unfortunately it doesn't resolve the problem because of CPS limitations. See the code http://gist.github.com/599575 It returns the error: type mismatch; found : Unit @scala.util.continuations.cpsParam[Unit,Unit] required: Unit data { i => yld(i) } – Dawid Grzesiak Sep 27 '10 at 18:42
  • @Dawid See the comment I added to that snippet. – Daniel C. Sobral Sep 27 '10 at 22:01
2

Here's a delimited continuations-based implementation, adapted from @Geoff Reedy's offering:

import Stream._
import scala.util.continuations._
import java.util.concurrent.SynchronousQueue

def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset {
    val queue = new SynchronousQueue[Option[A]]
    queue.put(Some(shift { k: (A=>Unit) =>
        new Thread() { 
            override def run() {
                data(k)
                // when (if) the data source stops pumping, add None 
                // to signal that the stream is dead
                queue.put(None)
            }
        }.start()
        continually(queue.take).takeWhile(_.isDefined).map(_.get)
    })
}
Tom Crockett
  • 30,818
  • 8
  • 72
  • 90