3

When I need to read millions of database rows from a PostgreSQL database using the JDBC driver, I always use a cursor, otherwise I will get an OutOfMemoryError. Here is the pattern (pseudocode) that I use:

begin transaction
execute("declare cursor...")
while (true) {
  boolean processedSomeRows = false
  resultSet = executeQuery("fetch forward...")
  while (resultSet.next()) {
    processedSomeRows = true
    ...
  }
  if (!processedSomeRows) break
}
close cursor
commit

This is a more "functional" equivalent that I came up with to be implemented in Scala:

begin transaction
execute("declare cursor...")

@tailrec
def loop(resultSet: ResultSet,
         processed: Boolean): Boolean = {
  if (!resultSet.next()) processed
  else {
    // Process current result set row
    loop(resultSet, true)
  }
}

while (loop(executeQuery("fetch forward..."), false))
  ; //Empty loop

close cursor
commit

I know this is contrived, but is there a better way without resorting to mutability? If I were trying to do this in Haskell, I might come up with a solution that involves monads, but I don't want to send my mind down those "twisty little passages, all alike," because it might never return...

Ralph
  • 31,584
  • 38
  • 145
  • 282
  • My immediate reaction is that an Iterator[ResultSet] feeding a .map() over whatever your "process current result set" function is would be part of a cleaner solution... any reason you've rejected that approach ? – timday Oct 08 '12 at 18:05
  • I usually use either `Iterator`s or `Stream`s to process `ResultSet`s, but I can't think of a way to stop the iteration (`processed` argument above). – Ralph Oct 09 '12 at 12:14
  • Use a takeWhile on the Iterator, and it'll stop drawing new values when whatever function you put in it goes false. Is this question actually about how to execute cleanup code when the Iterator is done with ? – timday Oct 09 '12 at 12:48
  • I'll have to think about that (using `takeWhile`) to see if I can terminate the outer `while` loop. – Ralph Oct 09 '12 at 13:32

2 Answers2

0

Here is a Scala solution that I came up with:

@tailrec
def processCursor(query: => ResultSet)(process: ResultSet => Unit) {
  @tailrec
  def loop(resultSet: ResultSet,
            processed: Boolean): Boolean = {
    if (!resultSet.next()) processed
    else {
      process
      loop(resultSet, true)
    }
  }
  if (loop(query, false)) processCursor(query)(process)
}

Call it like this:

begin transaction
execute("declare cursor...")

processCursor(statement.executeQuery("fetch forward...")) {
  resultSet =>
  // process current row of the ResultSet
}

close cursor
commit

How can this be improved?

Ralph
  • 31,584
  • 38
  • 145
  • 282
0

Alternative way based on @Ralph answer:

  def processCursor[T](resultSet: ResultSet)(process: ResultSet => T) = {
    @tailrec
    def loop(seq: Seq[T], resultSet: ResultSet): Seq[T] = {
      if (resultSet.next()) loop(seq :+ process(resultSet), resultSet)
      else seq
    }
    loop(Seq.empty, resultSet)
  }
Fede
  • 935
  • 8
  • 14