2

When I call par on collections, it seems to create about 5-10 threads, which is fine for CPU bound tasks.

But sometimes I have tasks which are IO bound, in which case I'd like to have 500-1000 threads pulling from IO concurrently - doing 10-15 threads is very slow and I see my CPUs mostly sitting idle.

How can I achieve this?

Krzysztof Atłasik
  • 21,985
  • 6
  • 54
  • 76
Ali
  • 261,656
  • 265
  • 575
  • 769
  • 3
    You can look here for an answer - https://stackoverflow.com/questions/9154691/how-to-set-the-number-of-threads-to-use-for-par – Gal Naor Jun 11 '19 at 10:11

1 Answers1

3

You could wrap your blocking io operations in blocking block:

(0 to 1000).par.map{ i =>
    blocking {
      Thread.sleep(100)
      Thread.activeCount()
    }
}.max // yield 67 on my pc, while without blocking it's 10

But you should ask yourself a question if you should use parallel collections for IO operations. Their use case is to perform a CPU heavy task.

I would suggest you to consider using futures for IO calls.

You should also consider using a custom execution context for that task because the global execution context is a public singleton and you don't have control what code uses it and for which purpose. You could easily starve parallel computations created by external libraries if you used all threads from it.

// or just use scala.concurrent.ExecutionContext.Implicits.global if you don't care
implicit val blockingIoEc: ExecutionContextExecutor = ExecutionContext.fromExecutor(
    Executors.newCachedThreadPool()
) 

def fetchData(index: Int): Future[Int] =  Future {
   //if you use global ec, then it's required to mark computation as blocking to increase threads,
   //if you use custom cached thread pool it should increase thread number even without it
    blocking { 
      Thread.sleep(100)
      Thread.activeCount()
    }
}

val futures = (0 to 1000).map(fetchData)

Future.sequence(futures).onComplete {
    case Success(data) => println(data.max) //prints about 1000 on my pc
}

Thread.sleep(1000)

EDIT

There is also a possibility to use custom ForkJoinPool using ForkJoinTaskSupport:

import java.util.concurrent.ForkJoinPool //scala.concurrent.forkjoin.ForkJoinPool is deprecated
import scala.util.Random
import scala.collection.parallel

val fjpool = new ForkJoinPool(2) 
val customTaskSupport = new parallel.ForkJoinTaskSupport(fjpool) 

val numbers = List(1,2,3,4,5).par 

numbers.tasksupport = customTaskSupport //assign customTaskSupport
Krzysztof Atłasik
  • 21,985
  • 6
  • 54
  • 76
  • Futures are such a pain to handle. Thanks for the info. Is there no way to specify a different ec for certain parallel collection tasks? – Ali Jun 12 '19 at 09:50
  • 1
    As far as I know, there is no easy way to pass own ec to parallel collections. Check [the link](https://stackoverflow.com/questions/9154691/how-to-set-the-number-of-threads-to-use-for-par) provided by Gal Naor. There are ways to set up parallelism level for collections, but solutions from there seem to me to be a little hacky and inelegant for me. – Krzysztof Atłasik Jun 12 '19 at 09:55