Check this out. Parallel collections are gone from 2.13, though there is an external library, that you can use. Still, I say, if you are looking to process a large sequence of data in parallel, just use spark (especially, if you are going to need an external library anyway (you can run spark on single node ... and when you say, you need this solution "for testing" it feels weird that you want to test one solution, and then run a completely different one).
Here is a solution for you without external libraries (just for completeness):
// First, create a local execution context to allow throttling the parallel jobs
val parallelism = 4 // how many chunks to process in parallel
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(
parallelism, parallelism, 0L, TimeUnit.SECONDS,
new ArrayBlockingQueue[Runnable](parallelism) {
override def offer(e: Runnable) = {
put(e)
true
}
}
))
// Now just split input into chunks and send to the executor
// This does not read anything into memory yet
val chunkSize = 4096 // how many lines to process at once
val jobs = source
.getLines
.grouped(chunkSize)
.map { chunk =>
Future {
chunk
.flatMap { _.split("""\W+""") }
.foldLeft(Map.empty[String, Int]) { case (m, w) =>
m + (w -> (m.getOrElse(w, 0) + 1))
}
}
}
// Now, combine the results.
// This will fetch `parallelism*chunkSize` lines into memory and start
// parallelism jobs processing the chunks. Once one of the jobs completes,
// it will read next `chunkSize` lines, and start another job. Etc.
val result: Future[Map[String, Int]] = Future.sequence(jobs.toSeq).map {
_.reduce { (m1, m2) =>
m1.foldLeft(m2){ case (m, (w, v)) => m + (w -> (m.getOrElse(w,0) + v)) }
}
}
The key to this is the ec
implementation limiting the number of futures currently "in flight". You can wrap it, and the chunking logic into a small utility class, and make it reusable if you'd like.
Though, I would still just use spark if I were you.