3

I wanted to control the number of threads in ExecutionContext. So I created a instance of ThreadPoolExecutor and then created ExecutionContext from it.

And I created some Futures and attached onSuccess callbacks on them. I expected each onSuccess callback was called when each Future work finished. But I found all onSuccess callbacks were executed at the same time.

import java.util.concurrent.{ Executors, ForkJoinPool }

import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.duration.Duration

object Main extends App {
  implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
  // implicit val ec = ExecutionContext.fromExecutorService(new ForkJoinPool(2))

  val start = System.currentTimeMillis()

  val futures = for {
    i <- 1 to 10
  } yield Future[Int] {
    Thread.sleep(i * 1000)
    i
  }

  futures.foreach { f =>
    f.onSuccess { case i =>
      println(s"${i} Success. ${System.currentTimeMillis() - start}ms elapsed.")
    }
  }

  Await.ready(Future.sequence(futures.toList), Duration.Inf)
  ec.shutdown()
}

// ThreadPoolExecutor Result
// 1 Success. 25060ms elapsed.
// 2 Success. 25064ms elapsed.
// 3 Success. 25064ms elapsed.
// 4 Success. 25064ms elapsed.
// 5 Success. 25064ms elapsed.
// 6 Success. 25064ms elapsed.
// 7 Success. 25065ms elapsed.
// 8 Success. 25065ms elapsed.
// 9 Success. 25065ms elapsed.
// 10 Success. 30063ms elapsed.

// ForkJoinPool Result
// 1 Success. 1039ms elapsed.
// 2 Success. 2036ms elapsed.
// 3 Success. 4047ms elapsed.
// 4 Success. 6041ms elapsed.
// 5 Success. 12042ms elapsed.
// 6 Success. 12043ms elapsed.
// 7 Success. 25060ms elapsed.
// 8 Success. 25060ms elapsed.
// 9 Success. 25060ms elapsed.
// 10 Success. 30050ms elapsed.

The result above was printed at the same time not respectively. But when I use ForkJoinPool instead of ThreadPoolExecutor this problem is mitigated. Did I misuse ExecutionContext and Future?

edited: I found the problem happens when the number of threads is less than the number of futures. So I've edited above code to reproduce the problem and print the execution time.

I think future callback should be called on time even if the number of threads is small...

Mario Galic
  • 47,285
  • 6
  • 56
  • 98
jyshin
  • 841
  • 1
  • 8
  • 15
  • You need to post exact code you execute. What you've pasted is incomplete, and doesn't produce the output you describe. In fact, for me everything work as you expect, not as what you see. – Haspemulator Jan 05 '17 at 12:57
  • I just edited the question. The problem happens when the number of threads is less than the futures... – jyshin Jan 06 '17 at 02:41
  • Do you want/need to mark each Future as `blocking`, per [this StackOverflow post](http://stackoverflow.com/questions/19681389/use-case-of-scala-concurrent-blocking)? – Castaglia Jan 06 '17 at 03:45
  • No. I want that onSuccess callback executed at the time when future is done. – jyshin Jan 06 '17 at 04:43

1 Answers1

0

I eventually knew that Future callbacks(onComplete or onSuccess) are executed on the thread of the provided ExecutionContext. So If there are no idle threads in the pool, callback could not be executed. See scala.concurrent.Future

But still I don't understand behavior of ForkJoinPool. I need to study about that.

jyshin
  • 841
  • 1
  • 8
  • 15
  • `ForkJoinPool` uses 2*(CPU logical cores) threads by default. Replace your `nThreads` with the same number, and you'll get the same results with `FixedThreadPool`. – Haspemulator Jan 06 '17 at 10:02