6

I have CSV files with comments that give column names, where the columns change throughout the file:

#c1,c2,c3
a,b,c
d,e,f
#c4,c5
g,h
i,j

I want to provide a way to iterate over (only) the data rows of the file as Maps of column names to values (all Strings). So the above would become:

Map(c1 -> a, c2 -> b, c3 -> c)
Map(c1 -> d, c2 -> e, c3 -> f)
Map(c4 -> g, c5 -> h)
Map(c4 -> i, c5 -> j)

The files are very large, so reading everything into memory is not an option. Right now I have an Iterator class that keeps some ugly state between hasNext() and next(); I also provide accessors for the current line number and the actual last line and comment read (in case consumers care about field order). I'd like to try to do things in a more functional way.

My first idea was a for comprehension: I can iterate over the lines of the file, skipping the comment lines with a filter clause. I can yield a tuple containing the map, the line number, etc. The problem is I need to remember the last column names seen so I can create Maps from them. For loops understandably try to discourage keeping state, by only letting you set new vals. I learned from this question that I can update member variables in the yield block, but that's precisely when I don't want to update them in my case!

I could call a function in the iteration clause that updates state, but that seems dirty. So, what is the best way to do this in a functional style? Abuse for comprehensions? Hack scanLeft? Use a library? Bring out the parser combinator big guns? Or is a functional style just not a good match for this problem?

Community
  • 1
  • 1
Jay Hacker
  • 1,835
  • 2
  • 18
  • 23
  • 1
    I think [this](http://stackoverflow.com/questions/7293617/split-up-a-list-at-each-element-satisfying-a-predicate-scala) task is quite simular to the part where you trying to split lines by #-headers. – om-nom-nom Sep 12 '11 at 15:45
  • @om-nom-nom. My idea exactly ;-) – Didier Dupont Sep 12 '11 at 16:13
  • Sorry, I should have mentioned that I need an Iterator or Traversable because the files don't fit into memory. Is a similar approach feasible without a List? – Jay Hacker Sep 12 '11 at 17:05

7 Answers7

5

State Monad FTW!

Actually, I suck at State monad. I had a hell of a time writing this up, and I have a strong feeling that it could be made much better. In particular, it seems to me that traverse is the way to go, but...

// Get Scalaz on the job
import scalaz._
import Scalaz._

// Some type aliases to make stuff clearer
type Input         = Stream[String]
type Header        = String
type InternalState = (Input, Header)
type Output        = Option[(Header, String)]
type MyState       = State[InternalState, Output]

// Detect headers
def isHeader(line: String) = line(0) == '#'

// From a state, produce an output
def makeLine: (InternalState => Output) = {
    case (head #:: _, _) if isHeader(head) => None
    case (head #:: _, header)              => Some(header -> head)
    case _                                 => None
}

// From a state, produce the next state
def nextLine: (InternalState => InternalState) = {
    case (head #:: tail, _) if isHeader(head) => tail -> head
    case (_ #:: tail, header)                 => tail -> header
    case _                                    => Stream.empty -> ""
}

// My state is defined by the functions producing the next state
// and the output
val myState: MyState = state(s => nextLine(s) -> makeLine(s))    

// Some input to test it. I'm trimming it to avoid problems on REPL
val input = """#c1,c2,c3
a,b,c
d,e,f
#c4,c5
g,h
i,j""".lines.map(_.trim).toStream

// My State/Output Stream -- def to avoid keeping a reference to the head
def stateOutputStream = Stream.iterate(myState(input, "")){ 
        case (s, _) => myState(s) 
    } takeWhile { case ((stream, _), output) => stream.nonEmpty || output.nonEmpty }

// My Output Stream -- flatMap gets rid of the None from the headers
def outputStream = stateOutputStream flatMap { case (_, output) => output }

// Now just get the map
def outputToMap: (Header, String) => Map[String, String] = {
    case (header, line) =>
        val keys = header substring 1 split ","
        val values = line split ","
        keys zip values toMap
}

// And this is the result -- note that I'm still avoiding "val" so memory
// won't leak
def result = outputStream map outputToMap.tupled
Daniel C. Sobral
  • 295,120
  • 86
  • 501
  • 681
3

Here's one way you could do this with Iteratees. The stream is represented as a function from Iteratee to Iteratee, so it's never actually realized in memory. I'm using the State monad to track the last encountered header.

import scalaz._
import Scalaz._
import IterV._

type Header = List[String]
type MyState[A] = State[Header, A]
type Out = Map[String, String]

// Detect headers
def isHeader(line: String) = line(0) == '#'

type Enumeratee[A, B, C] =
  IterV[B, C] => Iteratee[MyState, A, IterV[B, C]]

// Enumerate a list. Just for demonstration.
def enumerateM[M[_]: Monad, E, A]:
  (List[E], Iteratee[M, E, A]) => Iteratee[M, E, A] = {
    case (Nil, i) => i
    case (x :: xs, Iteratee(m)) => Iteratee(for {
      v <- m
      o <- v match {
        case d@DoneM(_, _) => d.pure[M]
        case ContM(k) => enumerateM.apply(xs, k(El(x))).value
      }
    } yield o)
  }

def stateTrans[A]: Enumeratee[String, Map[String, String], A] =
  i => Iteratee(i.fold(
         done = (_, _) => DoneM(i, Empty.apply).pure[MyState],
         cont = k => ContM((x: Input[String]) => x match {
           case El(e) => Iteratee[MyState, String, IterV[Out, A]](for {
             h <- init
             o <- if (isHeader(e))
                    put(e substring 1 split "," toList) map (_ => Empty[Out])
                  else El((h zip (e split ",")).toMap).pure[MyState]
             v <- stateTrans(k(o)).value
           } yield v)
           case Empty() => stateTrans(k(Empty.apply))
           case EOF() => stateTrans(k(EOF.apply))
         }).pure[MyState]
       ))

Let's test this and take the head of the output stream:

scala> (enumerateM[MyState, String, IterV[Out, Option[Out]]].apply(
     | List("#c1,c2,c3","a,b,c","d,e,f"), stateTrans(head)).value ! List())
     | match { case DoneM(a, _) => a match { case Done(b, _) => b } }
res0: Option[Out] = Some(Map(c1 -> a, c2 -> b, c3 -> c))

This could be made much nicer by abstracting some of this stuff out to helper functions.

Apocalisp
  • 34,834
  • 8
  • 106
  • 155
2

Here is a possible solution:

First have a look at the answer to Split up a list at each element satisfying a predicate (Scala), which will give you a groupPrefix function. You get a method groupPrefix, which split a list into a list of list, split occurrings when an item satisfy a given predicate. This way, you split you have list starting with each comment line (columns definition), and containing corresponding data afterwards

This routine will then transform one of the sublist (starting with column names) in the list of corresponding map.

import scala.collection.immutable.ListMap 
  // to keep the order of the columns. If not needed, just use Map
def toNamedFields(lines: List[String]) : List[Map[String, String]] = {
  val columns = lines.head.tail.split(",").toList // tail to discard the #
  lines.tail.map{line => ListMap(columns.zip(line.split(",")): _*)}
}

With that, you split your lines, get the maps in each group, getting a list of list of map which you turn into a single list with flatten

groupPrefix(lines){_.startsWith("#")}.map(toNamedFields).flatten
Community
  • 1
  • 1
Didier Dupont
  • 29,398
  • 7
  • 71
  • 90
2

Could probably be more elegant, but you'll get the drill:

  def read(lines: Iterator[String], currentHeadings: Option[Seq[String]] = None): Stream[Option[Map[String, String]]] = 
    if (lines.hasNext) {
      val l = lines.next
      if (l.startsWith("#"))
        Stream.cons(
          None,
          read(lines, Some(l.tail.split(","))))
      else
        Stream.cons(
          currentHeadings.map(_.zip(l.split(",")).toMap),
          read(lines, currentHeadings))
    } else Stream.cons(None, Stream.Empty)

  def main(args: Array[String]): Unit = {
    val lines = scala.io.Source.fromFile("data.csv").getLines
    println(read(lines).flatten.toList)
  }

Prints:

List(Map(c1 -> a, c2 -> b, c3 -> c), Map(c1 -> d, c2 -> e, c3 -> f), Map(c4 -> g, c5 -> h), Map(c4 -> i, c5 -> j))
Nicolas Payette
  • 14,847
  • 1
  • 27
  • 37
  • Of course, you don't have to `toList` it. `read(lines).flatten` will give you a lazy stream. – Nicolas Payette Sep 12 '11 at 19:53
  • Caveat: since `Stream` memoïzes the values, you need to be careful not to hold on to the head of the stream. Maybe someone can think of a way of doing this with iterators all the way, so you don't have that problem. – Nicolas Payette Sep 12 '11 at 19:56
2

Well here's the Python...

from collections import namedtuple

def read_shifty_csv(csv_file):
    cols = None
    for line in csv_file:
        line = line.strip()
        if line.startswith('#'):
            cols = namedtuple('cols', line[1:].split(','))
        else:
            yield cols(*line.split(','))._asdict()

Drop the _asdict() call if you'd rather work with a tuple than a mapping (dict). Only materializes a row at a time in memory.

Edit to try to be slightly more functional:

from collections import namedtuple
from itertools import imap

def read_shifty_csv(csv_file):
    cols = None
    for line in imap(str.strip, csv_file):
        if line.startswith('#'):
            cols = namedtuple('cols', line[1:].split(','))
        else:
            yield cols(*line.split(','))._asdict()

Just dropped the evil reassignment of line = line.strip()

schmichael
  • 412
  • 3
  • 8
  • Funny story: that's like one of the two use cases of namedtuple's ability to accept filed names as comma separated string. So just drop the split :) – Armin Ronacher Sep 15 '11 at 00:38
  • @ArminRonacher Honestly I don't even want people to know that exists. ;) But you're right, thanks to an ugly API hack this code could be simpler. – schmichael Sep 15 '11 at 00:42
  • 2
    You are re-assigning values to `cols`. That's not functional. – Daniel C. Sobral Sep 15 '11 at 01:50
  • Be careful of quoted csv values; i.e. "this, is a valid", line – dcolish Sep 15 '11 at 03:22
  • @DanielCSobral Indeed. Given the complexity of the purely functional approaches I couldn't resist trying an approach that minimized state but wasn't purely functional. Sorry for the noise. – schmichael Sep 15 '11 at 04:02
  • As a fellow Pythonista, I really want to upvote your answer, even though it is the wrong language. This is a great (or unfortunate) example of the practicality of the functional vs. imperative approaches: do I really want to understand and maintain all that functional code for the dubious benefit of chasing an ideal, and the likely cost of lower performance? Just between you and I, if Python was as fast as Scala, there would be no contest. – Jay Hacker Sep 15 '11 at 13:12
  • 1
    @Jay But here's the problem... it is equally easy to solve this problem in Scala in a non-functional manner. No one is asking how to solve it, the question is how to solve it in a pure functional way. That is even more important than the language, as a matter of fact. – Daniel C. Sobral Sep 15 '11 at 18:09
1

Inspired by @schmichael's valiant effort at a functional Python solution, here is my attempt at pushing things too far. I'm not claiming it's maintainable, efficient, exemplary, or scrutable, but it is functional:

from itertools import imap, groupby, izip, chain
from collections import deque
from operator import itemgetter, methodcaller
from functools import partial

def shifty_csv_dicts(lines):
    last = lambda seq: deque(seq, maxlen=1).pop()
    parse_header = lambda header: header[1:-1].split(',')
    parse_row = lambda row: row.rstrip('\n').split(',')
    mkdict = lambda keys, vals: dict(izip(keys,vals))
    headers_then_rows = imap(itemgetter(1), groupby(lines, methodcaller('startswith', '#')))
    return chain.from_iterable(imap(partial(mkdict, parse_header(last(headers))), imap(parse_row, next(headers_then_rows))) for headers in headers_then_rows)

Okay, let's unpack that.

The basic insight is to (ab)use itertools.groupby to recognize changes from headers to data rows. We use argument evaluation semantics to control the order of operations.

First, we tell groupby to group lines by whether or not they start with '#':

methodcaller('startswith', '#')

creates a function that takes a line and calls line.startswith('#') (it is equivalent to the stylistically preferable but less efficient lambda line: line.startswith('#')).

So groupby takes the incoming iterable of lines and alternates between returning an iterable of header lines (usually just one header), and an iterable of data rows. It actually returns a tuple of (group_val, group_iter), where in this case group_val is a bool indicating whether it's a header. So, we do the equivalent of (group_val, group_iter)[1] on all of the tuples to pick out the iterators: itemgetter(1) is just a function that runs "[1]" on whatever you give it (again equivalent to but more efficient than lambda t: t[1]). So we use imap to run our itemgetter function on every tuple returned by groupby to pick out the header / data iterators:

imap(itemgetter(1), groupby(lines, methodcaller('startswith', '#')))

We evaluate that expression first and give it a name because we will use it twice later, first for the headers, then for the data. The outermost call:

chain.from_iterable(... for headers in headers_then_rows)

steps through the iterators returned from groupby. We are being sly and calling the value headers because some other code inside the ... picks off the rows when we're not looking, advancing the groupby iterator in the process. This outer generator expression will only ever produce headers (remember, they alterate: headers, data, headers, data...). The trick is to make sure the headers get consumed before the rows, because they both share the same underlying iterator. chain.from_iterable just stitches the results of all the data rows iterators together into One Iterator To Return Them All.

So what are we stitching together? Well, we need to take the (last) header, zip it with each row of values, and make a dicts out of that. This:

last = lambda seq: deque(seq, maxlen=1).pop()

is a somewhat dirty but efficient hack to get the last item from an iterator, in this case our header line. We then parse the header by trimming the leading # and trailing newline, and split on , to get a list of column names:

parse_header = lambda header: header[1:-1].split(',')

But, we only want to do this once for each rows iterator, because it exhausts our headers iterator (and we wouldn't want to copy it into some mutable state now, would we?). We also have to ensure that the headers iterator gets used before the rows. The solution is to make a partially applied function, evaluating and fixing the headers as the first parameter, and taking a row as second parameter:

partial(mkdict, parse_header(last(headers)))

The mkdict function uses the column names as keys and row data as values to make a dict:

mkdict = lambda keys, vals: dict(izip(keys,vals))

This gives us a function that freezes the first parameter (keys) and lets you just pass the second parameter (vals): just what we need for creating a bunch of dicts with the same keys and different values.

To use it, we parse each row like you'd expect:

parse_row = lambda row: row.rstrip('\n').split(',')

recalling that next(headers_then_rows) will return the data rows iterator from groupby (since we already used the headers iterator):

imap(parse_row, next(headers_then_rows))

Finally, we map our partially applied dict-maker function to the parsed rows:

imap(partial(...), imap(parse_row, next(headers_then_rows)))

And these are all stitched together by chain.from_iterable to make one, big, happy, functional stream of shifty CSV dicts.

For the record, this could probably be simplified, and I would still do things @schmichael's way. But I learned things figuring this out, and I will try applying these ideas to a Scala solution.

Jay Hacker
  • 1,835
  • 2
  • 18
  • 23
0

EDIT: Scratch that, I don't think you need monads

GClaramunt
  • 3,148
  • 1
  • 21
  • 35