I want to generate an Observable
in real time from the results of a list of Futures
.
In the simplest case, suppose I have a list of futures I'm running with Future.sequence
, and I'm simply monitoring their progress with an Observable
that tells me each time one has completed. I'm basically doing it like this:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
Observable[String](observer => {
val loudFutures: List[Future[Int]] = futs.map(f => {
f onComplete {
case Success(a) => observer.onNext(s"just did $a more")
case Failure(e) => observer.onError(e)
}
f
})
Future.sequence(loudFutures) onComplete {
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
})
}
This works fine in my testing environment. But I've just read that onNext
shouldn't be called from different threads, at least without being careful that there are no overlapping calls. What is the recommended way to fix this? It seems that many real-world uses of Observables
would require onNext
to be called from async code like this, but I can't find a similar example in the docs.