3

SHORT VERSION

I'm looking for a way to set once and for all what Pool to use globally when I call the .par function of a collection...

Up to now I found only how to set the number of threads in the global ExecutionContext but not how to change the actual Pool used by default.

I merely want to explicitly specify the ForkJoinPool to make the parallel collections ExecutionContext independent from the Scala version I use.


LONG VERSION This requirement came in after we've got issues because Scala 2.10 doesn't support JDK 1.8

Scala simply didn't recognize the java version and thought we were still in 1.5, hence the pool was a different type and the number of threads wasn't limited to the number of processors

The problem is caused by this code:

if (scala.util.Properties.isJavaAtLeast("1.6")) new ForkJoinTaskSupport
    else new ThreadPoolTaskSupport

def isJavaAtLeast(version: String) = {
    val okVersions = version match {
      case "1.5"    => List("1.5", "1.6", "1.7")
      case "1.6"    => List("1.6", "1.7")
      case "1.7"    => List("1.7")
      case _        => Nil
    }
    okVersions exists (javaVersion startsWith _)
  }

As how we manage threads is quite critical in our application and we don't want unexpected surprises just changing a version, I wondered if it was possible to force Scala to use ForkJoinPool with a preset number of threads decided by us GLOBALLY (I don't want the single instance solution described here Scala Parallel Collections: How to know and configure the number of threads)

hope it's clear enough!

gmcontessa
  • 558
  • 4
  • 10
  • "Scala 2.10 doesn't support JDK 1.8" — eh? it doesn't? (are you thinking of 2.9?) – Seth Tisue Jun 01 '17 at 12:43
  • also have you seen https://stackoverflow.com/questions/17865823/how-do-i-set-the-default-number-of-threads-for-scala-2-10-parallel-collections? – Seth Tisue Jun 01 '17 at 12:45
  • It looks like there is no good solution so I suggest one more trick that works well only if you have few calls to `par` that you can easily track and change: add your own implicit wrapper class for parallelizable collection that adds `customPar` method that would call `par` and assign some fixed `tasksupport` object that you can configure. – SergGr Jun 02 '17 at 21:41
  • @SethTisue, I'm not thinking at 2.9, the code I pasted comes from scala.util.Properties of version 2.10.2 To answer your second comment, yes, I've seen the link you've pasted, but I don't like to rely on reflection for this, as it would strictly rely on the presence of certain fields across the versions, which is quite weak – gmcontessa Jun 03 '17 at 09:46
  • @SergGr, thanks for the suggestion. I would prefer another approach as I'm trying to find a solution which would avoid errors in future code. Using a wrapper works only until the entire team (and new members) remember to use it instead of the standard method. Anyway, I wonder if an implicit wrapper could override the par method. I may give it a go and see if that could be a way forward – gmcontessa Jun 03 '17 at 09:50
  • @gmconte, I understand that my suggestion is quite limited and this is one of the main reasons I didn't put it as an answer. Still, after looking into the code I tend to believe there is no perfect solution (assuming you can't upgrade to Scala 2.11). As to "_if an implicit wrapper could override the `par` method_", AFAIU, you can add `par` method to the wrapper but you'll loose the "implicit" part because compiler will prefer built-in `par` method and thus you'll have to explicitly create a wrapper to call it's `par` which makes no sense. – SergGr Jun 03 '17 at 09:54

1 Answers1

0

From my point of view, your question contain two different requirements :

One is I merely want to explicitly specify the ForkJoinPool to make the parallel collections ExecutionContext independent from the Scala version I use.

I'm not aware this is possible. Above all things, I'm made skeptical by the constructor class ForkJoinTaskSupport(val environment: ForkJoinPool). This constructor is being called with the ForkJoinPool backing the current execution context used by .par, which is the Global one if I'm not mistaken. A few layers later, we realize that this pool is defined here in ExecutionContextImpl :

def createExecutorService: ExecutorService = {

    [...]

    val desiredParallelism = range(
      getInt("scala.concurrent.context.minThreads", "1"),
      getInt("scala.concurrent.context.numThreads", "x1"),
      getInt("scala.concurrent.context.maxThreads", "x1"))

    val threadFactory = new DefaultThreadFactory(daemonic = true)

    try {
      new ForkJoinPool(
        desiredParallelism,
        threadFactory,
        uncaughtExceptionHandler,
        true) // Async all the way baby
    } catch {
      [...]
    }
  }

So it's not exactly a pool you can change, but it's still a pool you can definitely configure, which would solve the reformulation of your requirement, aka I wondered if it was possible to force Scala to use ForkJoinPool with a preset number of threads decided by us GLOBALLY

Full disclaimer : I never tried to do so, since I have not needed it so far, but your question made me wanna investigate a bit!

C4stor
  • 8,355
  • 6
  • 29
  • 47
  • Yes, I saw that it's possible to configure the parallelism through a system property, so the number of threads is controlled in my application. Those properties are using the number of processors by default in the ForkJoinPool which would be fine. But my main problem is still the code I mentioned which decides which is the default Pool to use, and as it looks it's not possible to override it – gmcontessa Jun 03 '17 at 09:37
  • Can you show how have you built the uncaughtExceptionHandler? – dirceusemighini Oct 28 '17 at 20:31
  • It's actually extracted from the scala codebase : https://github.com/scala/scala/blob/v2.11.11/src/library/scala/concurrent/impl/ExecutionContextImpl.scala – C4stor Oct 30 '17 at 16:41