8

I have very large Iterators that I want to split into pieces. I have a predicate that looks at an item and returns true if it is the start of a new piece. I need the pieces to be Iterators, because even the pieces will not fit into memory. There are so many pieces that I would be wary of a recursive solution blowing out your stack. The situation is similar to this question, but I need Iterators instead of Lists, and the "sentinels" (items for which the predicate is true) occur (and should be included) at the beginning of a piece. The resulting iterators will only be used in order, though some may not be used at all, and they should only use O(1) memory. I imagine this means they should all share the same underlying iterator. Performance is important.

If I were to take a stab at a function signature, it would be this:

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = ...

I would have loved to use takeWhile, but it loses the last element. I investigated span, but it buffers results. My current best idea involves BufferedIterator, but maybe there is a better way.

You'll know you've got it right because something like this doesn't crash your JVM:

groupby((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue / 2) == 0).foreach(group => println(group.sum))
groupby((1 to Int.MaxValue).iterator)(_ % 10 == 0).foreach(group => println(group.sum))
Community
  • 1
  • 1
Jay Hacker
  • 1,835
  • 2
  • 18
  • 23
  • See http://stackoverflow.com/questions/5410846/how-do-i-apply-the-pimp-my-library-pattern-to-scala-collections/5411133#5411133 – huynhjl Nov 22 '11 at 21:21

5 Answers5

7

You have an inherent problem. Iterable implies that you can get multiple iterators. Iterator implies that you can only pass through once. That means that your Iterable[Iterable[T]] should be able to produce Iterator[Iterable[T]]s. But when this returns an element--an Iterable[T]--and that asks for multiple iterators, the underlying single iterator can't comply without either caching the results of the list (too big) or calling the original iterable and going through absolutely everything again (very inefficient).

So, while you could do this, I think you should conceive of your problem in a different way.

If you could start with a Seq instead, you could grab subsets as ranges.

If you already know how you want to use your iterable, you could write a method

def process[T](source: Iterable[T])(starts: T => Boolean)(handlers: T => Unit *)

which increments through the set of handlers each time starts fires off a "true". If there's any way you can do your processing in one sweep, something like this is the way to go. (Your handlers will have to save state via mutable data structures or variables, however.)

If you can permit iteration on the outer list to break the inner list, you could have an Iterable[Iterator[T]] with the additional constraint that once you iterate to a later sub-iterator, all previous sub-iterators are invalid.


Here's a solution of the last type (from Iterator[T] to Iterator[Iterator[T]]; one can wrap this to make the outer layers Iterable instead).

class GroupedBy[T](source: Iterator[T])(starts: T => Boolean)
extends Iterator[Iterator[T]] {
  private val underlying = source
  private var saved: T = _
  private var cached = false
  private var starting = false
  private def cacheNext() {
    saved = underlying.next
    starting = starts(saved)
    cached = true
  }
  private def oops() { throw new java.util.NoSuchElementException("empty iterator") }
  // Comment the next line if you do NOT want the first element to always start a group
  if (underlying.hasNext) { cacheNext(); starting = true }
  def hasNext = {
    while (!(cached && starting) && underlying.hasNext) cacheNext()
    cached && starting
  }
  def next = {
    if (!(cached && starting) && !hasNext) oops()
    starting = false
    new Iterator[T] {
      var presumablyMore = true
      def hasNext = {
        if (!cached && !starting && underlying.hasNext && presumablyMore) cacheNext()
        presumablyMore = cached && !starting
        presumablyMore
      }
      def next = {
        if (presumablyMore && (cached || hasNext)) { 
          cached = false
          saved
        }
        else oops()
      }
    }
  }
}
Rex Kerr
  • 166,841
  • 26
  • 322
  • 407
  • 1
    `Iterator[Iterator[T]]` would be fine; my underlying iterator can only and should only allow one pass anyway. I want skipping sub-iterators to invalidate previous sub-iterators. I don't know the length ahead of time, so a `Seq` isn't possible. I do know how I want to use my iterable, but I thought such a function would be useful generally. – Jay Hacker Nov 23 '11 at 14:34
6

Here's my solution using BufferedIterator. It doesn't let you skip iterators correctly, but it's fairly simple and functional. The first element(s) go into a group even if !startsGroup(first).

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] =
  new Iterator[Iterator[T]] {
    val base = iter.buffered
    override def hasNext = base.hasNext  
    override def next() = Iterator(base.next()) ++ new Iterator[T] {
      override def hasNext = base.hasNext && !startsGroup(base.head) 
      override def next() = if (hasNext) base.next() else Iterator.empty.next()
    }
  }

Update: Keeping a little state lets you skip iterators and prevent people from messing with previous ones:

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] =
new Iterator[Iterator[T]] {
  val base = iter.buffered
  var prev: Iterator[T] = Iterator.empty
  override def hasNext = base.hasNext  
  override def next() = {
    while (prev.hasNext) prev.next()        // Exhaust previous iterator; take* and drop* do NOT always work!!  (Jira SI-5002?)
    prev = Iterator(base.next()) ++ new Iterator[T] {
      var hasMore = true
      override def hasNext = { hasMore = hasMore && base.hasNext && !startsGroup(base.head) ; hasMore } 
      override def next() = if (hasNext) base.next() else Iterator.empty.next()
    }
    prev
  }
}
Jay Hacker
  • 1,835
  • 2
  • 18
  • 23
2
import scala.collection.mutable.ArrayBuffer

object GroupingIterator {

  /**
   * Create a new GroupingIterator with a grouping predicate.
   *
   * @param it The original iterator
   * @param p Predicate controlling the grouping
   * @tparam A Type of elements iterated
   * @return A new GroupingIterator
   */
  def apply[A](it: Iterator[A])(p: (A, IndexedSeq[A]) => Boolean): GroupingIterator[A] =
    new GroupingIterator(it)(p)
}

/**
 * Group elements in sequences of contiguous elements that satisfy a predicate. The predicate
 * tests each single potential next element of the group with the help of the elements grouped so far.
 * If it returns true, the potential next element is added to the group, otherwise
 * a new group is started with the potential next element as first element
 *
 * @param self The original iterator
 * @param p Predicate controlling the grouping
 * @tparam A Type of elements iterated
 */
class GroupingIterator[+A](self: Iterator[A])(p: (A, IndexedSeq[A]) => Boolean) extends Iterator[IndexedSeq[A]] {

  private[this] val source = self.buffered
  private[this] val buffer: ArrayBuffer[A] = ArrayBuffer()

  def hasNext: Boolean = source.hasNext

  def next(): IndexedSeq[A] = {
    if (hasNext)
      nextGroup()
    else
      Iterator.empty.next()
  }

  private[this] def nextGroup(): IndexedSeq[A] = {
    assert(source.hasNext)

    buffer.clear()
    buffer += source.next

    while (source.hasNext && p(source.head, buffer)) {
      buffer += source.next
    }

    buffer.toIndexedSeq
  }
}
piotr
  • 5,657
  • 1
  • 35
  • 60
2

If you are looking at memory constraints then the following will work. You can only use it if your underlying iterable object supports views. This implementation will iterate over the Iterable and then generate IterableViews which can then be iterated over. This implementation does not care if the very first element tests as a start group since it will be regardless.

def groupby[T](iter: Iterable[T])(startsGroup: T => Boolean): Iterable[Iterable[T]] = new Iterable[Iterable[T]] {
  def iterator = new Iterator[Iterable[T]] {
    val i = iter.iterator
    var index = 0
    var nextView: IterableView[T, Iterable[T]] = getNextView()
    private def getNextView() = {
      val start = index
      var hitStartGroup = false
      while ( i.hasNext && ! hitStartGroup ) {
        val next = i.next()
        index += 1
        hitStartGroup = ( index > 1 && startsGroup( next ) )
      }
      if ( hitStartGroup ) {
        if ( start == 0 ) iter.view( start, index - 1 )
        else iter.view( start - 1, index - 1 )
      } else { // hit end
        if ( start == index ) null
        else if ( start == 0 ) iter.view( start, index )
        else iter.view( start - 1, index )
      }
    }
    def hasNext = nextView != null
    def next() = {
      if ( nextView != null ) {
        val next = nextView
        nextView = getNextView()
        next
      } else null
    }
  }
}
Neil Essy
  • 3,607
  • 1
  • 19
  • 23
2

You can maintain low memory foot-print by using Streams. Use result.toIterator, if you an iterator again.

With streams, there's no mutable state, only a single conditional and it's nearly as concise as Jay Hacker's solution.

 def batchBy[A,B](iter: Iterator[A])(f: A => B): Stream[(B, Iterator[A])] = {
    val base = iter.buffered
    val empty = Stream.empty[(B,  Iterator[A])]

    def getBatch(key: B) = {
      Iterator(base.next()) ++ new Iterator[A] {
        def hasNext: Boolean = base.hasNext && (f(base.head) == key)
        def next(): A = base.next()
      }
    }

    def next(skipList: Option[Iterator[A]] = None): Stream[(B, Iterator[A])] = {
      skipList.foreach{_.foreach{_=>}}

      if (base.isEmpty) empty
      else {
        val key = f(base.head)
        val batch = getBatch(key)

        Stream.cons((key, batch), next(Some(batch)))
      }
    }

    next()
  }

I ran the tests:

scala> batchBy((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue / 2) == 0)
         .foreach{case(_,group) => println(group.sum)}
-1610612735
1073741823
-536870909
2147483646
2147483647

The second test prints too much to paste to Stack Overflow.

agarman
  • 437
  • 4
  • 7