5

I want to generate an Observable in real time from the results of a list of Futures.

In the simplest case, suppose I have a list of futures I'm running with Future.sequence, and I'm simply monitoring their progress with an Observable that tells me each time one has completed. I'm basically doing it like this:

  def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
    Observable[String](observer => {
      val loudFutures: List[Future[Int]] = futs.map(f => {
            f onComplete {
              case Success(a) => observer.onNext(s"just did $a more")
              case Failure(e) => observer.onError(e)
            }
            f
        })
      Future.sequence(loudFutures) onComplete {
          case Success(_) => observer.onCompleted()
          case Failure(e) => observer.onError(e)
      }
    })
  }

This works fine in my testing environment. But I've just read that onNext shouldn't be called from different threads, at least without being careful that there are no overlapping calls. What is the recommended way to fix this? It seems that many real-world uses of Observables would require onNext to be called from async code like this, but I can't find a similar example in the docs.

thund
  • 1,842
  • 2
  • 21
  • 31
  • I'm not sure if there's a better answer, but you can ensure that `onNext` calls are run by the same thread if you use for example a single threaded execution context (`ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())`) to run those `onComplete`s callbacks. – Kolmar Mar 12 '16 at 16:35
  • Can you reference which article, regarding `onNext` are you referring to? This use case is perfectly fine, from my point of view. – mavarazy Mar 12 '16 at 16:51
  • @mavarazy: Much of the documentation I've found on this is pretty unclear but [this](http://reactivex.io/documentation/operators/serialize.html) talks about using `serialize()` to avoid two overlapping `onNext()` calls, and [this](https://github.com/ReactiveX/RxJava/wiki/Subject) warns you not to call `onNext()` from multiple threads - at least if you're using a Subject. And all of the Rx official examples I could find are single-threaded. – thund Mar 12 '16 at 22:53

1 Answers1

1

The Observable Contract

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

Take a look at Serialize

It is possible for an Observable to invoke its observers’ methods asynchronously, perhaps from different threads. This could make such an Observable violate the Observable contract, in that it might try to send an OnCompleted or OnError notification before one of its OnNext notifications, or it might make an OnNext notification from two different threads concurrently. You can force such an Observable to be well-behaved and synchronous by applying the Serialize operator to it.

MyDogTom
  • 4,486
  • 1
  • 28
  • 41
  • Thanks. When I'd quickly looked at the Serialize page before I'd been discouraged by the fact that the Rx-scala section just says TBD and doesn't give the actual syntax. I wasn't sure whether it had ever been implemented. However, the following syntax compiles: `Observable[String](observer => { ... }).serialize`, and it seems to run fine. I can't honestly tell yet whether `.serialize` does anything - for that I'll try building some tougher stress-tests. I'm also surprised it isn't the default implementation of `Observable`. – thund Mar 14 '16 at 03:21