1

I'm writing scala <-> java interop wrappers for Futures and I don't know the Right Way to implement scala.concurrent.Future.onComplete (http://www.scala-lang.org/api/current/index.html#scala.concurrent.Future). This probably works:

def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
  executor.execute(new Runnable {
    @tailrec
    def run = value match {
      case Some(t) => func(t)
      case None => { Thread.sleep(100); run }
    }
  })
}

but Asynchronous IO in Scala with futures suggests that when I have to block I should pass the relevant part of the code to scala.concurrent.blocking to let the ExecutionContext know what's up. The problem is that when I surround the value match{...} with blocking {} it's no longer a tail call.

What's the proverbial right way to do this?

Edit: for completeness here is the entire wrapping class:

class JavaFutureWrapper[T](val jf: java.util.concurrent.Future[T]) extends scala.concurrent.Future[T] {
  def isCompleted = jf.isDone

  def result(atMost: Duration)(implicit permit: CanAwait): T =
    atMost match { case Duration(timeout, units) => jf.get(timeout, units) }

  def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
    executor.execute(new Runnable {
      @tailrec
      def run = value match {
        case Some(t) => func(t)
        case None => { Thread.sleep(100); run }
      }
    })
  }

  def ready(atMost: Duration)(implicit permit: CanAwait): this.type = atMost match {
    case Duration(timeout, units) => {
      jf.get(timeout, units)
      this
    }
  }

  def value: Option[Try[T]] = (jf.isCancelled, jf.isDone) match {
    case (true, _) => Some(Failure(new Exception("Execution was cancelled!")))
    case (_, true) => Some(Success(jf.get))
    case _ => None
  }
}
Community
  • 1
  • 1
teryret
  • 577
  • 1
  • 5
  • 15
  • Can you indicate what is Java API? Where does `value` come from, for example? And why are you putting the thread to sleep for 100ms? In other words, where is your potentially blocking code? – 0__ Aug 21 '13 at 22:29
  • I've pasted the entire wrapping class for you. The sleep is to prevent tight looping as I poll for whether or not the java future is done yet. The whole run method will block until jf decides it is either done or canceled. – teryret Aug 22 '13 at 01:03
  • 1
    What if you only surround the Thread.sleep() with blocking? Another idea is to use a while instead of a recursive call. – Luciano Aug 22 '13 at 12:12
  • Oh snap, I hadn't thought of only surrounding the sleep, I think we may have a winner. ExecutionContexts should be smart enough to handle rapidly coming out of and then right back into blocking sections, right? – teryret Aug 22 '13 at 15:30

2 Answers2

2

I would just wait for the Java future to complete:

import scala.util.{Try, Success, Failure}
import scala.concurrent._
import java.util.concurrent.TimeUnit

class JavaFutureWrapper[T](val jf: java.util.concurrent.Future[T])
  extends scala.concurrent.Future[T] {
  ...

  def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit =
    executor.execute(new Runnable {
      def run: Unit = {
        val result = Try(blocking(jf.get(Long.MaxValue, TimeUnit.MILLISECONDS)))
        func(result)
      }
    })
  ...
}
0__
  • 66,707
  • 21
  • 171
  • 266
  • Oh, right, yeah, that's clearly the right answer, thanks! I've sent you an edit that simplifies the code a bit, once that's in I'll mark this one as the answer. – teryret Aug 22 '13 at 15:47
  • The edit changes the meaning. You put the `blocking` around the execution of `func`. But it is the responsibility of `func` to declare whether it is doing something that blocks or not. So I think this version is correct, block only while waiting for `jf` to complete. – 0__ Aug 22 '13 at 16:00
  • Good point, here's another version of the edit that doesn't have that problem – teryret Aug 22 '13 at 16:02
0

Hmm, my edit to 0__ 's answer didn't get approved, so for the sake of future readers, here's the solution I'm going with (which is simplified from 0__'s)

def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
  executor.execute(new Runnable {
    def run = func(Try( blocking { jf.get } ))
  })
}
teryret
  • 577
  • 1
  • 5
  • 15
  • Yes, `.get` without timeout seems to wait indefinitely. – 0__ Aug 24 '13 at 12:33
  • Indefinitely as opposed to roughly 300 million years? Yeah, somehow I'm okay with that trade off. Plus if after 300 million years the future still hasn't arrived I'd expect onComplete to not indicate completion. – teryret Aug 24 '13 at 17:30