1

There is some data that I have pulled from a remote API, for which I use a Future-style interface. The data is structured as a linked-list. A relevant example data container is shown below.

case class Data(information: Int) {
    def hasNext: Boolean = ??? // Implemented
    def next: Future[Data] = ??? // Implemented
}

Now I'm interested in adding some functionality to the data class, such as map, foreach, reduce, etc. To do so I want to implement some form of IterableLike such that it inherets these methods. Given below is the trait Data may extend, such that it gets this property.

trait AsyncIterable[+T]
    extends IterableLike[Future[T], AsyncIterable[T]]
{
    def hasNext : Boolean
    def next : Future[T]

    // How to implement?
    override def iterator: Iterator[Future[T]] = ???
    override protected[this] def newBuilder: mutable.Builder[Future[T], AsyncIterable[T]] = ???
    override def seq: TraversableOnce[Future[T]] = ???
}

It should be a non-blocking implementation, which when acted on, starts requesting the next data from the remote data source. It is then possible to do cool stuff such as

case class Data(information: Int) extends AsyncIterable[Data]
val data = Data(1) // And more, of course
// Asynchronously print all the information.
data.foreach(data => println(data.information))

It is also acceptable for the interface to be different. But the result should in some way represent asynchronous iteration over the collection. Preferably in a way that is familiar to developers, as it will be part of an (open source) library.

Calavoow
  • 492
  • 3
  • 19
  • 1
    Do you really want to stick with `Iterator` interface? You may consider using (or implementing) something similar to [Rx Observable](http://reactivex.io/rxscala/). In fact async Observable is [dual](http://csl.stanford.edu/~christos/pldi2010.fit/meijer.duality.pdf) to sync Iterable. – 4e6 Jun 09 '15 at 16:55
  • Yes, that is a good point, it does not have to be this specific interface. The only requirement is that the interface over the collection is asynchronous. However, it is not a data source which _produces_ information by itself, it should not start producing events without a subscriber asking for them. – Calavoow Jun 09 '15 at 17:00
  • You might want to look at scalaz. You may be able to do something with `ListT[Future, Data]` – Daenyth Jun 09 '15 at 17:12
  • 1
    Take a look at Twitter's [`Spool`](https://twitter.github.io/util/docs/index.html#com.twitter.concurrent.Spool), which is basically an asynchronous version of `Stream`. – Travis Brown Jun 09 '15 at 17:56
  • @TravisBrown I've been able to implement something that looks nice on Spool, however the Futures used within Spool are of the Twitter kind (i.e. `com.twitter.util.Future`) which do not mesh well with Scala Futures. I have taken the implicit conversions posted http://stackoverflow.com/questions/30317473/convert-scala-future-to-twitter-future, but are there any plans to change this to scala futures? – Calavoow Jun 27 '15 at 14:21
  • @Calavoow We've discussed it, but while the APIs are similar, there are some [important semantic differences](https://lobste.rs/s/zjsnw0/building_products_at_soundcloud_microservices_in_scala_and_finagle), and it's not clear at this point that having Twitter's futures implement the Scala trait would be practical or valuable, so the officially supported conversions in Bijection are the way to go for now. – Travis Brown Jun 27 '15 at 17:13
  • @TravisBrown Do you have some kind of example where the Spool tail is only filled when necessary? I'm going off the documentation on https://twitter.github.io/util/docs/index.html#com.twitter.concurrent.Spool, but there it is assumed there is some asynchronous process continually emitting items, instead I want to calculate the next tail item lazily. – Calavoow Jan 06 '16 at 16:55

2 Answers2

1

In production I would use one of following:

  1. Akka Streams
  2. Reactive Extensions

For private tests I would implement something similar to following. (Explanations are below)

I have modified a little bit your Data:

abstract class AsyncIterator[T] extends Iterator[Future[T]] {
  def hasNext: Boolean
  def next(): Future[T]
}

For it we can implement this Iterable:

class AsyncIterable[T](sourceIterator: AsyncIterator[T])
  extends IterableLike[Future[T], AsyncIterable[T]]
{
  private def stream(): Stream[Future[T]] =
    if(sourceIterator.hasNext) {sourceIterator.next #:: stream()} else {Stream.empty}
  val asStream = stream()

  override def iterator = asStream.iterator
  override def seq = asStream.seq
  override protected[this] def newBuilder = throw new UnsupportedOperationException()
}

And if see it in action using following code:

object Example extends App {
  val source = "Hello World!";

  val iterator1 = new DelayedIterator[Char](100L, source.toCharArray)
  new AsyncIterable(iterator1).foreach(_.foreach(print)) //prints 1 char per 100 ms
  pause(2000L)

  val iterator2 = new DelayedIterator[String](100L, source.toCharArray.map(_.toString))
  new AsyncIterable(iterator2).reduceLeft((fl: Future[String], fr) =>
    for(l <- fl; r <- fr) yield {println(s"$l+$r"); l + r}) //prints 1 line per 100 ms
  pause(2000L)

  def pause(duration: Long) = {println("->"); Thread.sleep(duration); println("\n<-")}
}

class DelayedIterator[T](delay: Long, data: Seq[T]) extends AsyncIterator[T] {
  private val dataIterator = data.iterator
  private var nextTime = System.currentTimeMillis() + delay
  override def hasNext = dataIterator.hasNext
  override def next = {
    val thisTime = math.max(System.currentTimeMillis(), nextTime)
    val thisValue = dataIterator.next()
    nextTime = thisTime + delay
    Future {
      val now = System.currentTimeMillis()
      if(thisTime > now) Thread.sleep(thisTime - now) //Your implementation will be better
      thisValue
    }
  }
}

Explanation

AsyncIterable uses Stream because it's calculated lazily and it's simple.

Pros:

  • simplicity
  • multiple calls to iterator and seq methods return same iterable with all items.

Cons:

  • could lead to memory overflow because stream keeps all prevously obtained values.
  • first value is eagerly gotten during creation of AsyncIterable

DelayedIterator is very simplistic implementation of AsyncIterator, don't blame me for quick and dirty code here.

It's still strange for me to see synchronous hasNext and asynchronous next()

Oleg Rudenko
  • 708
  • 4
  • 12
  • Thank you for your comment. I think you assume that `Data` is some form of iterator, but instead `Data` is an element of a singly linked-list like structure, with a pointer to the next element that is asynchronous. Thus `hasNext` and `next` should be called on the `Data` returned by `next` (after the Future completes). Did I understand you correctly? – Calavoow Sep 14 '15 at 16:30
  • @Calavoow exactly, I understood it so. Now I see that Data is a linked list. I will update my answer, but it could take some days. – Oleg Rudenko Sep 15 '15 at 08:32
  • No problem, I am very interested in a solution with Streams. Could you also elaborate on why you would use Akka Streams instead of the standard library Stream? I also thought of Rx, but the input is not a continuous input stream, rather it is pull based. I have to request the next `Data` from the server. Btw, the total amount of data is limited, so it is no problem to store it all in memory. – Calavoow Sep 15 '15 at 22:50
  • Your solution with Twitter Spool looks good and I see no reason to compete with it. I will keep my answer as an example for AsyncIterator. – Oleg Rudenko Sep 26 '15 at 10:52
  • the problem with using a `scala.collection.immutable.Stream` underneath is that it's going to memoize all the elements it sees. so the entire iteration will be stored in memory. in which case, might as well just use a `Future[Seq[A]]` – john sullivan Feb 05 '17 at 13:36
0

Using Twitter Spool I've implemented a working example. To implement spool I modified the example in the documentation.

import com.twitter.concurrent.Spool
import com.twitter.util.{Await, Return, Promise}

import scala.concurrent.{ExecutionContext, Future}

trait AsyncIterable[+T <: AsyncIterable[T]] { self : T =>
    def hasNext : Boolean
    def next : Future[T]

    def spool(implicit ec: ExecutionContext) : Spool[T] = {
        def fill(currentPage: Future[T], rest: Promise[Spool[T]]) {
            currentPage foreach { cPage =>
                if(hasNext) {
                    val nextSpool = new Promise[Spool[T]]
                    rest() = Return(cPage *:: nextSpool)
                    fill(next, nextSpool)
                } else {
                    val emptySpool = new Promise[Spool[T]]
                    emptySpool() = Return(Spool.empty[T])
                    rest() = Return(cPage *:: emptySpool)
                }
            }
        }
        val rest = new Promise[Spool[T]]
        if(hasNext) {
            fill(next, rest)
        } else {
            rest() = Return(Spool.empty[T])
        }
        self *:: rest
    }
}

Data is the same as before, and now we can use it.

// Cool stuff
implicit val ec = scala.concurrent.ExecutionContext.global
val data = Data(1) // And others
// Print all the information asynchronously
val fut = data.spool.foreach(data => println(data.information))
Await.ready(fut)

It will trow an exception on the second element, because the implementation of next was not provided.

Calavoow
  • 492
  • 3
  • 19