1

I have a huge file (does not fit into memory) which is tab separated with two columns (key and value), and pre-sorted on the key column. I need to call a function on all values for a key and write out the result. For simplicity, one can assume that the values are numbers and the function is addition.

So, given an input:

A 1
A 2
B 1
B 3

The output would be:

A 3
B 4

For this question, I'm not so much interested in reading/writing the file, but more in the list comprehension side. It is important though that the whole content (input as well as output) doesn't fit into memory. I'm new to Scala, and coming from Java I'm interested what would be the functional/Scala way to do that.

Update:

Based on AmigoNico's comment, I came up with the below constant memory solution. Any comments / improvements are appreciated!

val writeAggr = (kv : (String, Int)) => {println(kv._1 + " " + kv._2)}
writeAggr( 
  (  ("", 0) /: scala.io.Source.fromFile("/tmp/xx").getLines ) { (keyAggr, line) => 
    val Array(k,v) = line split ' '
    if (keyAggr._1.equals(k)) {
      (k, keyAggr._2 + v.toInt) 
    } else { 
      if (!keyAggr._1.equals("")) {
        writeAggr(keyAggr)
      }
      (k, v.toInt)
    }
  }
)
benroth
  • 2,468
  • 3
  • 24
  • 25
  • Ah, interesting -- you are using the result of the fold not as an aggregate for the lines, but as a way to get at the residual sum. I'm sure lots of folks will balk at this way of using a fold, but I have to admit that it cleans up the code! – AmigoNico Apr 18 '14 at 07:11
  • You shouldn't really use `.equals` in Scala; you're calling the Java method directly. And `if (!keyAggr._1.equals(""))` could be `if (keyAggr._1.nonEmpty)`. – AmigoNico Apr 18 '14 at 07:14
  • Put the nonEmpty check into writeAggr and it will correctly handle the case where there is no data at all. – AmigoNico Apr 18 '14 at 07:15

2 Answers2

3

This can be done quite elegantly with Scalaz streams (and unlike iterator-based solutions, it's "truly" functional):

import scalaz.stream._

val process =
  io.linesR("input.txt")
    .map { _.split("\\s") }
    .map { case Array(k, v) => k -> v.toInt }
    .pipe(process1.chunkBy2(_._1 == _._1))
    .map { kvs => s"${ kvs.head._1 } ${ kvs.map(_._2).sum }\n" }
    .pipe(text.utf8Encode)
    .to(io.fileChunkW("output.txt"))

Not only will this read from the input, aggregate the lines, and write to the output in constant memory, but you also get nice guarantees about resource management that e.g. source.getLines can't offer.

Travis Brown
  • 138,631
  • 12
  • 375
  • 680
  • Thanks, this is interesting! I'm curious though whether there is a way to do that without 3rd party libraries. – benroth Apr 17 '14 at 16:21
  • Awesome! Just having written the brute-force code to do this with iterators, your answer makes me appreciate the power of Scalaz streams. +1 – AmigoNico Apr 18 '14 at 00:02
2

You probably want to use a fold, like so:

scala> ( ( Map[String,Int]() withDefaultValue 0 ) /: scala.io.Source.fromFile("/tmp/xx").getLines ) { (map,line) =>
  val Array(k,v) = line split ' '
  map + ( k -> ( map(k) + v.toInt ) )
} 
res12: scala.collection.immutable.Map[String,Int] = Map(A -> 3, B -> 4)

Folds are great for accumulating results (unlike for-comprehensions). And since getLines returns an Iterator, only one line is held in memory at a time.

UPDATE: OK, there is a new requirement that we not hold the results in memory either. In that case I think I'd just write a recursive function and use it like so:

scala> val kvPairs = scala.io.Source.fromFile("/tmp/xx").getLines map { line =>
  val Array(k,v) = line split ' '
  ( k, v.toInt )
}
kvPairs: Iterator[(String, Int)] = non-empty iterator

scala> final def loop( key:String, soFar:Int ) {
  if ( kvPairs.hasNext ) {
    val (k,v) = kvPairs.next
    if ( k == key )
      loop( k, soFar+v )
    else {
      println( s"$key $soFar" )
      loop(k,v)
    }
  } else println( s"$key $soFar" )
}
loop: (key: String, soFar: Int)Unit

scala> val (k,v) = kvPairs.next
k: String = A
v: Int = 1

scala> loop(k,v)
A 3
B 4

But the only thing functional about that is that it uses a recursive function rather than a loop. If you are OK with holding all of the values for a particular key in memory you could write a function that iterates over the lines of the file producing an Iterator of Iterators of like-keyed pairs, which you could then just sum and print, but the code would still not be particularly functional and it would be slower.

Travis's Scalaz pipeline solution looks like an interesting one along those lines, but with the iteration hidden behind some handy constructs. If you specifically want a functional solution, I'd say his is the best answer.

AmigoNico
  • 6,652
  • 1
  • 35
  • 45
  • Also, the /: syntax sometimes scares folks new to Scala. If it bothers you, please check out [this post](http://nicholassterling.wordpress.com/2010/07/28/an-intuition-about-scalas-operator-foldleft/). – AmigoNico Apr 17 '14 at 01:43
  • But the output would all be held in the map, right? In my case, it is too big. Since the input is presorted by key, I'm looking for a solution that only holds the information for one key at a time (and then writes out). – benroth Apr 17 '14 at 14:45
  • Yes, if the map couldn't be held in memory then the fold is a bad idea. I've added an update with a recursive solution. – AmigoNico Apr 17 '14 at 23:59
  • thanks for the update - I was interested in the 'Scala-way', i.e. the most elegant way using typical scala constructs. Both answers seem to have their merits. – benroth Apr 18 '14 at 01:19