47

How do I read a large CSV file (> 1 Gb) with a Scala Stream? Do you have a code example? Or would you use a different way to read a large CSV file without loading it into memory first?

Jan Willem Tulp
  • 1,229
  • 2
  • 11
  • 14
  • Do you mean stream as in the lazily evaluated feature? It's presumably possibly, but not required? - reading a file line-by-line is in essence already. I'm not very up to speed with Scala io yet, but getLines (from a quick browse of the source) is also implemented in a lazy fashion - does it read all the file into memory? – The Archetypal Paul Nov 23 '10 at 10:46
  • I believe it does read into memory, since I get an OutOfMemoryException when using scala.Source.fromFile() and then getLines(). So using a Stream class sounds like a valid alternative, right? – Jan Willem Tulp Nov 23 '10 at 10:49
  • I strongly suggest you use a well-maintained RFC driven native Scala library which optimally handles this problem, kantan.csv: https://nrinaudo.github.io/kantan.csv – chaotic3quilibrium Aug 30 '20 at 20:17

3 Answers3

78

Just use Source.fromFile(...).getLines as you already stated.

That returns an Iterator, which is already lazy (You'd use stream as a lazy collection where you wanted previously retrieved values to be memoized, so you can read them again)

If you're getting memory problems, then the problem will lie in what you're doing after getLines. Any operation like toList, which forces a strict collection, will cause the problem.

Kevin Wright
  • 49,540
  • 9
  • 105
  • 155
  • 1
    I guess the OutOfMemoryException is indeed caused by the operations afterwards. Thanks! – Jan Willem Tulp Nov 23 '10 at 11:15
  • That's maybe be not good dealing with iterator when your business logic needs to traverse iterator several times to calculate something. You able to use iterator once. It seems it would be better dealing with stream. like in this question: http://stackoverflow.com/questions/17004455/scala-iterator-and-stream-example-stream-fails-on-reuse – ses Jun 09 '13 at 03:54
  • There is an error in this approach. It specifically corrupts on column values which contain a valid line break. Because of so many issues, even in the presence of an RFC for the .csv MIME-type, I strongly suggest you use a well-maintained RFC driven native Scala library which optimally handles this problem, kantan.csv: https://nrinaudo.github.io/kantan.csv – chaotic3quilibrium Aug 30 '20 at 20:20
14

I hope you don't mean Scala's collection.immutable.Stream with Stream. This is not what you want. Stream is lazy, but does memoization.

I don't know what you plan to do, but just reading the file line-by-line should work very well without using high amounts of memory.

getLines should evaluate lazily and should not crash (as long as your file does not have more than 2³² lines, afaik). If it does, ask on #scala or file a bug ticket (or do both).

soc
  • 27,983
  • 20
  • 111
  • 215
4

UPDATE 2020/08/30: Please use the Scala library, kantan.csv, for the most accurate and correct implementation of RFC 4180 which defines the .csv MIME-type.

While I enjoyed the learning process I experienced creating the solution below, please refrain from using it as I have found a number of issues with it especially at scale. To avoid the obvious technical debt arising from my solution below, choosing a well-maintained RFC driven Scala native solution should be how you take care of your current and future clients.


If you are looking to process the large file line-by-line while avoiding requiring the entire file's contents be loaded into memory all at once, then you can use the Iterator returned by scala.io.Source.

I have a small function, tryProcessSource, (containing two sub-functions) which I use for exactly these types of use-cases. The function takes up to four parameters, of which only the first is required. The other parameters have sane default values provided.

Here's the function profile (full function implementation is at the bottom):

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
  ???
}

The first parameter, file: File, is required. And it is just any valid instance of java.io.File which points to a line-oriented text file, like a CSV.

The second parameter, parseLine: (Int, String) => Option[List[String]], is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int, unparsedLine: String. And then return an Option[List[String]]. The function may return a Some wrapped List[String] consisting of the valid column values. Or it may return a None which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, line) => Some(List(line)) is provided. This default results in the entire line being returned as a single String value.

The third parameter, filterLine: (Int, List[String]) => Option[Boolean], is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int, parsedValues: List[String]. And then return an Option[Boolean]. The function may return a Some wrapped Boolean indicating whether this particular line should be included in the output. Or it may return a None which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, values) => Some(true) is provided. This default results in all lines being included.

The fourth and final parameter, retainValues: (Int, List[String]) => Option[List[String]], is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int, parsedValues: List[String]. And then return an Option[List[String]]. The function may return a Some wrapped List[String] consisting of some subset and/or alteration of the existing column values. Or it may return a None which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, values) => Some(values) is provided. This default results in the values parsed by the second parameter, parseLine.

Consider a file with the following contents (4 lines):

street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240

The following calling profile...

val tryLinesDefaults =
  tryProcessSource(new File("path/to/file.csv"))

...results in this output for tryLinesDefaults (the unaltered contents of the file):

Success(
  List(
    List("street,street2,city,state,zip"),
    List("100 Main Str,,Irving,TX,75039"),
    List("231 Park Ave,,Irving,TX,75039"),
    List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
  )
)

The following calling profile...

val tryLinesParseOnly =
  tryProcessSource(
      new File("path/to/file.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
  )

...results in this output for tryLinesParseOnly (each line parsed into the individual column values):

Success(
  List(
    List("street","street2","city","state","zip"),
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
    List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
  )
)

The following calling profile...

val tryLinesIrvingTxNoHeader =
  tryProcessSource(
      new File("C:/Users/Jim/Desktop/test.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
    , filterLine =
        (index, parsedValues) =>
          Some(
            (index != 0) && //skip header line
            (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
            (parsedValues(3).toLowerCase == "Tx".toLowerCase)
          )
  )

...results in this output for tryLinesIrvingTxNoHeader (each line parsed into the individual column values, no header and only the two rows in Irving,Tx):

Success(
  List(
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
  )
)

Here's the entire tryProcessSource function implementation:

import scala.io.Source
import scala.util.Try

import java.io.File

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
  def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
    try {Try(transfer(source))} finally {source.close()}
  def recursive(
    remaining: Iterator[(String, Int)],
    accumulator: List[List[String]],
    isEarlyAbort: Boolean =
      false
  ): List[List[String]] = {
    if (isEarlyAbort || !remaining.hasNext)
      accumulator
    else {
      val (line, index) =
        remaining.next
      parseLine(index, line) match {
        case Some(values) =>
          filterLine(index, values) match {
            case Some(keep) =>
              if (keep)
                retainValues(index, values) match {
                  case Some(valuesNew) =>
                    recursive(remaining, valuesNew :: accumulator) //capture values
                  case None =>
                    recursive(remaining, accumulator, isEarlyAbort = true) //early abort
                }
              else
                recursive(remaining, accumulator) //discard row
            case None =>
              recursive(remaining, accumulator, isEarlyAbort = true) //early abort
          }
        case None =>
          recursive(remaining, accumulator, isEarlyAbort = true) //early abort
      }
    }
  }
  Try(Source.fromFile(file)).flatMap(
    bufferedSource =>
      usingSource(bufferedSource) {
        source =>
          recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
      }
  )
}

While this solution is relatively succinct, it took me considerable time and many refactoring passes before I was finally able to get to here. Please let me know if you see any ways it might be improved.


UPDATE: I have just asked the issue below as it's own StackOverflow question. And it now has an answer fixing the error mentioned below.

I had the idea to try and make this even more generic changing the retainValues parameter to transformLine with the new generics-ified function definition below. However, I keep getting the highlight error in IntelliJ "Expression of type Some[List[String]] doesn't conform to expected type Option[A]" and wasn't able to figure out how to change the default value so the error goes away.

def tryProcessSource2[A <: AnyRef](
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  transformLine: (Int, List[String]) => Option[A] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
  ???
}

Any assistance on how to make this work would be greatly appreciated.

Community
  • 1
  • 1
chaotic3quilibrium
  • 5,661
  • 8
  • 53
  • 86