4

I need to run word count on an input file of upto a million lines in scala. Each line is very long (>150K characters) too. The following is the standard program that works:

val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\\W+"))
.foldLeft(Map.empty[String, Int]){
    (count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}

The below modification fails with the error, value par is not a member of Iterator[String]

val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\\W+"))
.par
.foldLeft(Map.empty[String, Int]){
    (count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}

I am surprised with this as similar programs seem to work.

Further, I am wondering if par.reduce would be a faster and more efficient than a working par.foldLeft.

Would be grateful for any help or leads on this issue.

TIA

Quiescent
  • 1,088
  • 7
  • 18

1 Answers1

4

Check this out. Parallel collections are gone from 2.13, though there is an external library, that you can use. Still, I say, if you are looking to process a large sequence of data in parallel, just use spark (especially, if you are going to need an external library anyway (you can run spark on single node ... and when you say, you need this solution "for testing" it feels weird that you want to test one solution, and then run a completely different one).

Here is a solution for you without external libraries (just for completeness):

    // First, create a local execution context to allow throttling the parallel jobs 
   val parallelism = 4 // how many chunks to process in parallel

   implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(
    parallelism, parallelism, 0L, TimeUnit.SECONDS,
    new ArrayBlockingQueue[Runnable](parallelism) {
      override def offer(e: Runnable) = {
        put(e)
        true
      }
    }
  ))

   // Now just split input into chunks and send to the executor
   // This does not read anything into memory yet
   
   val chunkSize = 4096 // how many lines to process at once

    val jobs = source 
     .getLines
     .grouped(chunkSize)
     .map { chunk => 
       Future {
          chunk
            .flatMap { _.split("""\W+""") }
            .foldLeft(Map.empty[String, Int]) { case (m, w) => 
                m + (w -> (m.getOrElse(w, 0) + 1))
            }
       }
     }

  // Now, combine the results.
  // This will fetch `parallelism*chunkSize` lines into memory and start
  // parallelism jobs processing the chunks. Once one of the jobs completes, 
  // it will read next `chunkSize` lines, and start another job. Etc.
    val result: Future[Map[String, Int]] = Future.sequence(jobs.toSeq).map { 
      _.reduce { (m1, m2) =>
        m1.foldLeft(m2){ case (m, (w, v)) => m + (w -> (m.getOrElse(w,0) + v)) }
      }
  }

The key to this is the ec implementation limiting the number of futures currently "in flight". You can wrap it, and the chunking logic into a small utility class, and make it reusable if you'd like. Though, I would still just use spark if I were you.

Dima
  • 39,570
  • 6
  • 44
  • 70
  • I have never used futures :( and have tried the following way from what I could gather from available sources. I imported `Future`, `ExecutionContextExecutor`, `ExecutionContext`, `FutureConverters._` from scala and `ThreadPoolExecutor`, `TimeUnit`, `ArrayBlockingQueue` from java and also `ExecutionContext.Implicits.global`. Replaced `source` with input file source. Still I get `error: not enough arguments for method sequence: ...` though it should not be the case at all as I could gather from other sources. Am I using some wrong imports? – Quiescent May 20 '21 at 13:37
  • 1
    @Quiescent sorry, I fat-fingered some stuff there. Should be fixed now. Here is a [scastie snippet](https://scastie.scala-lang.org/J0rGSBRHRIqObigGz2bz5Q) for you with all the imports. If you are using scala you _gotta_ get familiar with futures, _especially_ if you are reluctant to use spark for parallelism. – Dima May 20 '21 at 18:42
  • I am grateful for your help. Have run the snippet and it works fine. I use scala for Spark which is already in use, but this is single node implementation is an academic exercise to port to a non-cluster version. Intrinsic parallelism is more useful as I don't need to worry about number of cores (nc). As I can get nc through scala itself, so it's not an issue. Thanks a lot again. – Quiescent May 20 '21 at 19:38
  • 1
    You can ran spark locally too, doesn't have to be a cluster – Dima May 20 '21 at 19:40
  • Yes, but the user will have to install and configure. This is easier for the less gifted. :) – Quiescent May 20 '21 at 19:40