1

Let's say we have a fake data source which will return data it holds in batch

class DataSource(size: Int) {
    private var s = 0
    implicit val g = scala.concurrent.ExecutionContext.global
    def getData(): Future[List[Int]] = {
        s = s + 1
        Future {
        Thread.sleep(Random.nextInt(s * 100))
        if (s <= size) {
            List.fill(100)(s)
        } else {
            List()
        }
    }

}
object Test extends App {
    val source = new DataSource(100)
    implicit val g = scala.concurrent.ExecutionContext.global

    def process(v: List[Int]): Unit = {
        println(v)
    }

    def next(f: (List[Int]) => Unit): Unit = {
        val fut = source.getData()
        fut.onComplete {
            case Success(v) => {
                f(v)
                v match {
                    case h :: t => next(f)
                }
            }
        }
    }

    next(process)

    Thread.sleep(1000000000)
}

I have mine, the problem here is some portion is more not pure. Ideally, I would like to wrap the Future for each batch into a big future, and the wrapper future success when last batch returned 0 size list? My situation is a little from this post, the next() there is synchronous call while my is also async.

Or is it ever possible to do what I want? Next batch will only be fetched when the previous one is resolved in the end whether to fetch the next batch depends on the size returned?

What's the best way to walk through this type of data sources? Are there any existing Scala frameworks that provide the feature I am looking for? Is play's Iteratee, Enumerator, Enumeratee the right tool? If so, can anyone provide an example on how to use those facilities to implement what I am looking for?

Edit---- With help from chunjef, I had just tried out. And it actually did work out for me. However, there was some small change I made based on his answer.

Source.fromIterator(()=>Iterator.continually(source.getData())).mapAsync(1)    (f=>f.filter(_.size > 0))
    .via(Flow[List[Int]].takeWhile(_.nonEmpty))
    .runForeach(println)

However, can someone give comparison between Akka Stream and Play Iteratee? Does it worth me also try out Iteratee?


Code snip 1:

Source.fromIterator(() => Iterator.continually(ds.getData)) // line 1
    .mapAsync(1)(identity) // line 2
    .takeWhile(_.nonEmpty) // line 3
    .runForeach(println)   // line 4

Code snip 2: Assuming the getData depends on some other output of another flow, and I would like to concat it with the below flow. However, it yield too many files open error. Not sure what would cause this error, the mapAsync has been limited to 1 as its throughput if I understood correctly.

Flow[Int].mapConcat[Future[List[Int]]](c => {
  Iterator.continually(ds.getData(c)).to[collection.immutable.Iterable]
}).mapAsync(1)(identity).takeWhile(_.nonEmpty).runForeach(println)

4 Answers4

1

The following is one way to achieve the same behavior with Akka Streams, using your DataSource class:

import scala.concurrent.Future
import scala.util.Random

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._

object StreamsExample extends App {
  implicit val system = ActorSystem("Sandbox")
  implicit val materializer = ActorMaterializer()

  val ds = new DataSource(100)

  Source.fromIterator(() => Iterator.continually(ds.getData)) // line 1
        .mapAsync(1)(identity) // line 2
        .takeWhile(_.nonEmpty) // line 3
        .runForeach(println)   // line 4
}

class DataSource(size: Int) {
  ...
}

A simplified line-by-line overview:

  • line 1: Creates a stream source that continually calls ds.getData if there is downstream demand.
  • line 2: mapAsync is a way to deal with stream elements that are Futures. In this case, the stream elements are of type Future[List[Int]]. The argument 1 is the level of parallelism: we specify 1 here because DataSource internally uses a mutable variable, and a parallelism level greater than one could produce unexpected results. identity is shorthand for x => x, which basically means that for each Future, we pass its result downstream without transforming it.
  • line 3: Essentially, ds.getData is called as long as the result of the Future is a non-empty List[Int]. If an empty List is encountered, processing is terminated.
  • line 4: runForeach here takes a function List[Int] => Unit and invokes that function for each stream element.
Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
  • The getData will return 0 size list if the data reach to the end. In this case , should identity be a filter? Or how to stop the flow when the first 0 list returned by getData? – Tododo Chen Aug 27 '17 at 02:20
  • @TododoChen: As I mentioned in the answer, `.takeWhile(_.nonEmpty)` will complete the stream if an empty `List` is encountered. `identity` should not be replaced with a filter. – Jeffrey Chung Aug 27 '17 at 11:41
  • can you please take a look at my edit in the question? What is the difference between the two code snip? How do I avoid the iterator running too fast? I think somewhere in the actor system can set the number of actors who kick off the fetch of the data? – Tododo Chen Aug 30 '17 at 08:28
0

Ideally, I would like to wrap the Future for each batch into a big future, and the wrapper future success when last batch returned 0 size list?

I think you are looking for a Promise.

You would set up a Promise before you start the first iteration.

This gives you promise.future, a Future that you can then use to follow the completion of everything.

In your onComplete, you add a case _ => promise.success().

Something like

def loopUntilDone(f: (List[Int]) => Unit): Future[Unit] = {
  val promise = Promise[Unit]

  def next(): Unit = source.getData().onComplete {
        case Success(v) => 
            f(v)
            v match {
                case h :: t => next()
                case _ => promise.success()
            }      
        case Failure(e) => promise.failure(e)
  }


  // get going
  next(f)

  // return the Future for everything
  promise.future
}


// future for everything, this is a `Future[Unit]`
// its `onComplete` will be triggered when there is no more data
val everything = loopUntilDone(process)
Thilo
  • 257,207
  • 101
  • 511
  • 656
  • Yeah, I have thought of it. But I think there got to be some more elegant way. So that the client code can call like calling an IteratorLike's foreach or next to walk through? – Tododo Chen Aug 25 '17 at 11:35
  • You mean you want an `Iterator[Future[Int]]` that gives you the unbatched individual elements? I don't think you can do that, because you will have to wait for the next batch to arrive before the iterator can know if there should be another element or not. – Thilo Aug 25 '17 at 11:39
  • You may want to look at reactive stream processing for alternative approaches. – Thilo Aug 25 '17 at 11:42
  • I want something that i = new MyIteratorLike(s:Datasource) Val l: List[Futuer[List[Int]]] = i.foreach(f:()=>Unit) – Tododo Chen Aug 25 '17 at 11:44
  • That cannot work, because you need to be able to iterate through a `List` without blocking, and your `List` does not know if there is another batch before that batch arrived. – Thilo Aug 25 '17 at 11:46
  • in my case is this even possible when next batch depends on previous' return list's size? – Tododo Chen Aug 25 '17 at 11:47
  • If you knew that there are exactly 10 batches, you could do this (even including staggered fetches). – Thilo Aug 25 '17 at 11:47
0

You are probably looking for a reactive streams library. My personal favorite (and one I'm most familiar with) is Monix. This is how it will work with DataSource unchanged

import scala.concurrent.duration.Duration
import scala.concurrent.Await

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global

object Test extends App {
    val source = new DataSource(100)
    val completed = // <- this is Future[Unit], completes when foreach is done
        Observable.repeat(Observable.fromFuture(source.getData()))
            .flatten // <- Here it's Observable[List[Int]], it has collection-like methods
            .takeWhile(_.nonEmpty)
            .foreach(println)

    Await.result(completed, Duration.Inf)
}
Oleg Pyzhcov
  • 7,323
  • 1
  • 18
  • 30
  • Thanks a lot. Why do you choose monic over Akka Stream? – Tododo Chen Aug 26 '17 at 13:19
  • @TododoChen personal preference, mostly. I use monix `Task` as both IO monad for `cats` and replacement for `Future`, so it's more convenient. Also, I find monix being easier to understand somewhat. – Oleg Pyzhcov Aug 26 '17 at 16:45
0

I just figured out that by using flatMapConcat can achieve what I wanted to achieve. There is no point to start another question as I have had the answer already. Put my sample code here just in case someone is looking for similar answer.

This type of API is very common for some integration between traditional Enterprise applications. The DataSource is to mock the API while the object App is to demonstrate how the client code can utilize Akka Stream to consume the APIs.

In my small project the API was provided in SOAP, and I used scalaxb to transform the SOAP to Scala async style. And with the client calls demonstrated in the object App, we can consume the API with AKKA Stream. Thanks for all for the help.

class DataSource(size: Int) {
    private var transactionId: Long = 0
    private val transactionCursorMap: mutable.HashMap[TransactionId, Set[ReadCursorId]] = mutable.HashMap.empty
    private val cursorIteratorMap: mutable.HashMap[ReadCursorId, Iterator[List[Int]]] = mutable.HashMap.empty
    implicit val g = scala.concurrent.ExecutionContext.global

    case class TransactionId(id: Long)

    case class ReadCursorId(id: Long)

    def startTransaction(): Future[TransactionId] = {
        Future {
            synchronized {
                transactionId += transactionId
            }
            val t = TransactionId(transactionId)
            transactionCursorMap.update(t, Set(ReadCursorId(0)))
            t
        }
    }

    def createCursorId(t: TransactionId): ReadCursorId = {
        synchronized {
            val c = transactionCursorMap.getOrElseUpdate(t, Set(ReadCursorId(0)))
            val currentId = c.foldLeft(0l) { (acc, a) => acc.max(a.id) }
            val cId = ReadCursorId(currentId + 1)
            transactionCursorMap.update(t, c + cId)
            cursorIteratorMap.put(cId, createIterator)
            cId
        }
    }

    def createIterator(): Iterator[List[Int]] = {
        (for {i <- 1 to 100} yield List.fill(100)(i)).toIterator
    }

    def startRead(t: TransactionId): Future[ReadCursorId] = {
        Future {

            createCursorId(t)
        }
    }

    def getData(cursorId: ReadCursorId): Future[List[Int]] = {

        synchronized {
            Future {
                Thread.sleep(Random.nextInt(100))
                cursorIteratorMap.get(cursorId) match {
                    case Some(i) => i.next()
                    case _ => List()
                }
            }
        }
    }


}


object Test extends App {
    val source = new DataSource(10)
    implicit val system = ActorSystem("Sandbox")
    implicit val materializer = ActorMaterializer()
    implicit val g = scala.concurrent.ExecutionContext.global
    //
    //  def process(v: List[Int]): Unit = {
    //    println(v)
    //  }
    //
    //  def next(f: (List[Int]) => Unit): Unit = {
    //    val fut = source.getData()
    //    fut.onComplete {
    //      case Success(v) => {
    //        f(v)
    //        v match {
    //
    //          case h :: t => next(f)
    //
    //        }
    //      }
    //
    //    }
    //
    //  }
    //
    //  next(process)
    //
    //  Thread.sleep(1000000000)

    val s = Source.fromFuture(source.startTransaction())
      .map { e =>
          source.startRead(e)
      }
      .mapAsync(1)(identity)
      .flatMapConcat(
          e => {
              Source.fromIterator(() => Iterator.continually(source.getData(e)))
          })
      .mapAsync(5)(identity)
      .via(Flow[List[Int]].takeWhile(_.nonEmpty))
      .runForeach(println)


    /*
      val done = Source.fromIterator(() => Iterator.continually(source.getData())).mapAsync(1)(identity)
        .via(Flow[List[Int]].takeWhile(_.nonEmpty))
        .runFold(List[List[Int]]()) { (acc, r) =>
          //      println("=======" + acc + r)
          r :: acc
        }

      done.onSuccess {

        case e => {
          e.foreach(println)
        }

      }
      done.onComplete(_ => system.terminate())
    */
}