EDIT:
Your solution is correct as far as I can tell. If you want to improve performance a bit, consider:
val observableOut: Observable[Out] = observableIn.flatMap { in =>
val p = Promise.successful(in)
Observable.from(futureInToFutureOut(p.future))
}
This is a bit faster, as it does not create an asynchronous computation to map the future like Future.apply
does.
OLD:
I am leaving my old suggestion below, which works only in the case that you are mapping a single event in the Observable
.
import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
promiseIn.future.map(futureInToFutureOut).foreach { y =>
observer.onNext(y)
observer.onCompleted()
}
}
Explanation
Since you are starting out with an Observable[In]
object (i.e. an event stream), you need to find a way to transfer an event from that Observable
to a future. The typical way to create a new future is to create its Promise
first -- the input side of the future object. You then use foreach
on the Observable
to invoke trySuccess
on the future when the first event arrives:
observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)
Once the event on the Observable
arrives, the promise will be asynchronously completed. We can now get the reading side of the promise, i.e. the future by calling its future
method; and then call map
on the future -- promiseIn.future.map(futureInToFutureOut)
. Graphically:
promiseIn.future ---x---> map ---f(x)---> futureOut
The resulting future is asynchronously completed with futureInToFutureOut(x)
. At this point we need to find a way to emit this value back through an Observable[Out]
. A typical way to create new Observable
is to call the Observable.create
factory method. This method gives as the Observable
's writing end -- the Observer
, which we use to emit events to by calling onNext
:
futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))
Since we know that a future emits at most a single event, we call the onCompleted
on the observer, to close the output Observable
.
Edit: if you want to master Rx and Scala futures, you may want to consider this book, which deals with these topics. Disclaimer: I'm the author.