6

Is there a simple way to use scala parallel collections without loading a full collection into memory?

For example I have a large collection and I'd like to perform a particular operation (fold) in parallel only on a small chunk, that fits into memory, than on another chunk and so on, and finally recombine results from all chunks.

I know, that actors could be used, but it would be really nice to use par-collections.

I've written a solution, but it isn't nice:

  def split[A](list: Iterable[A], chunkSize: Int): Iterable[Iterable[A]] = {
    new Iterator[Iterable[A]] {
      var rest = list
      def hasNext = !rest.isEmpty
      def next = {
        val chunk = rest.take(chunkSize)
        rest = rest.drop(chunkSize)
        chunk
      }
    }.toIterable
  }                                               

  def foldPar[A](acc: A)(list: Iterable[A], chunkSize: Int, combine: ((A, A) => A)): A = {
    val chunks: Iterable[Iterable[A]] = split(list, chunkSize)
    def combineChunk: ((A,Iterable[A]) => A) = { case (res, entries) => entries.par.fold(res)(combine) }
    chunks.foldLeft(acc)(combineChunk)
  }                                               

  val chunkSize = 10000000                        
    val x = 1 to chunkSize*10                 

    def sum: ((Int,Int) => Int) = {case (acc,n) => acc + n }

    foldPar(0)(x,chunkSize,sum)
om-nom-nom
  • 62,329
  • 13
  • 183
  • 228
Mikhail Golubtsov
  • 6,285
  • 3
  • 29
  • 36
  • 1
    I would say that correct computation model here will be *map reduce* (and thus it could be [Spark](http://spark-project.org/examples/)), not actors per se. – om-nom-nom Jun 30 '13 at 19:44
  • Formally - yes, but processing time isn't sensible in this case so it is totally ok to run on a single machine. – Mikhail Golubtsov Jun 30 '13 at 20:31

1 Answers1

4

Your idea is very neat and it's a pity there is no such function available already (AFAIK).

I just rephrased your idea into a bit shorter code. First, I feel that for parallel folding it's useful to use the concept of monoid - it's a structure with an associative operation and a zero element. The associativity is important because we don't know the order in which we combine result that are computed in parallel. And the zero element is important so that we can split computations into blocks and start folding each one from the zero. There is nothing new about it though, it's just what fold for Scala's collections expects.

// The function defined by Monoid's apply must be associative
// and zero its identity element.
trait Monoid[A]
  extends Function2[A,A,A]
{
  val zero: A
}

Next, Scala's Iterators already have a useful method grouped(Int): GroupedIterator[Seq[A]] which slices the iterator into fixed-size sequences. It's quite similar to your split. This allows us to cut the input into fixed-size blocks and then apply Scala's parallel collection methods on them:

def parFold[A](c: Iterator[A], blockSize: Int)(implicit monoid: Monoid[A]): A =
  c.grouped(blockSize).map(_.par.fold(monoid.zero)(monoid))
                      .fold(monoid.zero)(monoid);

We fold each block using the parallel collections framework and then (without any parallelization) combine the intermediate results.

An example:

// Example:
object SumMonoid extends Monoid[Long] {
  override val zero: Long = 0;
  override def apply(x: Long, y: Long) = x + y;
}
val it = Iterator.range(1, 10000001).map(_.toLong)
println(parFold(it, 100000)(SumMonoid));
Petr
  • 62,528
  • 13
  • 153
  • 317
  • Nice usage of monoid, never know it before. Concerning grouped method I was in doubt that it could load whole stuff into memory but it turnted out that it doesn't. – Mikhail Golubtsov Jul 05 '13 at 06:53
  • I'll test your solution a little bit later, but it seems that it should work and it's much more concise. Many thanks! – Mikhail Golubtsov Jul 05 '13 at 07:05
  • @MikhailGolubtsov Please let me know how your testing goes, I'm also curious. I've done only some very basic tests myself. – Petr Jul 05 '13 at 08:21
  • So I've run processing of a big collection and haven't got out of heap space. So it actually works. But in my task I noted that there is only _some_ parallelism and entries in iterator are preprocessed in a single thread. So it could be improved further.. – Mikhail Golubtsov Jul 06 '13 at 22:56
  • @MikhailGolubtsov Yes, I'm aware of that. First, at the end of each inner parallel fold it can happen that not cores are fully used, if a part of a block takes longer to compute than others. Second, if calling `next` on the main iterator takes some measurable time, this won't be parallelized. – Petr Jul 07 '13 at 07:28
  • @MikhailGolubtsov Perhaps a solution for the second problem would be to run two threads, one that would produce grouped blocks from the iterator and pass it to the second using something like a bounded queue. This way, consuming the iterator would be parallelized with processing of the grouped blocks. – Petr Jul 07 '13 at 07:40
  • My idea was not to use any explicit threading in the name of simplicity. I think I should have a deeper look at scala par-collections (for instance ParIterable), but they don't look simple for me :) – Mikhail Golubtsov Jul 07 '13 at 10:23
  • @MikhailGolubtsov I'm not sure if `ParIterable` will not read the whole collection before splitting it and running parallel processing on it. – Petr Jul 07 '13 at 10:58