0

I'm writing an application server and there is a message sending loop. A message is composed of fields and thus can be viewed as an iterator that iterates over the fields. And there is a message queue that is processed by the message loop, but the loop is breakable at any time (e.g. when the socket buffer is full) and can be resumed later. Current implementation looks like:

private val messageQueue: Queue[Iterator[Field]]

sent = 0
breakable {
  for (iterator <- messageQueue) {
    for (field <- iterator) {
      ... breakable ...
    }
    sent += 1
  }
} finally messageQueue.trimStart(sent)

This works and is not bad, but then I thought I could make the code a bit cleaner if I could replace the queue by an iterator that concatenates iterators using the ++ operator. To say:

private val messageQueue: Iterator[Field] = message1.iterator ++ message2.iterator ++ ...

breakable {
  for (field <- messageQueue) {
    ... breakable ...
  }
}

Now the code looks much cleaner but there's a performance issue. Concatenated iterators form a (unbalanced) tree internally so the next() operation takes O(n) of time. So the iteration takes O(n^2) of time overall.

To summarize, the messages need to be processed just once so the queue doesn't need to be a Traversable. An Iterator (TraversableOnce) would do. I'd like to view the message queue as a collection of consecutive iterators but the ++ has a performance issue. Would there be a nice solution that makes the code cleaner but is efficient at the same time?

K J
  • 4,505
  • 6
  • 27
  • 45

2 Answers2

2

Have you thought about using Stream and #::: to lazily concatenate your messages together?

private val messageQueue: Stream[Field] = message1.toStream #::: message2.toStream #::: ...

breakable {
  for (field <- messageQueue) {
    ... breakable ...
  }
}

As for the time complexity here, I believe it would be O(n) in the number of iterators you're concatenating (you need to call toStream for each iterator and #::: them together). However, the individual toStream and #::: operations should be O(1) since they're lazy. Here's the toStream implementation for Iterator:

def toStream: Stream[A] =
    if (self.hasNext) Stream.cons(self.next, self.toStream)
    else Stream.empty[A]

This will take constant time because the 2nd argument to Stream.cons is call-by-name, so it won't get evaluated until you actually access the tail.

However, the conversion to Stream will add a constant factor of overhead for each element access, i.e. instead of just calling next on the iterator it will have to do a few extra method calls to force the lazy tail of the stream and access the contained value.

DaoWen
  • 32,589
  • 6
  • 74
  • 101
  • Stream is lazy but its append operation still takes O(n) of time, so I'm afraid it has no performance benefit over my approach using iterator. – K J Mar 01 '13 at 16:50
  • Are you sure about that? If that's the case it seems to me like they implemented it wrong... The `.toStream` and `#:::` operations should be constant-time. – DaoWen Mar 01 '13 at 16:52
  • The #::: operation takes O(1) of time to prepend an item but O(n) to append. For example, let's say we add 5 items to the stream in turn. The stream would look like (head=a, tail=(((b c) d) e)). It's okay when we need to get the first element, but as soon as we proceed to the next element, it'll have to go down 4 levels to get b. – K J Mar 01 '13 at 17:04
  • If we know all elements to add from the beginning, it would be okay because we can prepend them in reverse order. But as the name messageQueue implies, items should be appended later, and the append operation takes linear time for Stream.. – K J Mar 01 '13 at 17:08
  • I think you have the nesting wrong there. Since [#::: is right-associative](http://stackoverflow.com/a/1162980/1427124), the result of all the append calls should come out looking like this: (head=a, tail=(head=b, tail=(head=c, tail=...))). However, since the whole thing is done lazily it wouldn't flatten it out right away—otherwise you'd end up with O(n) complexity again. The `#:::` operation just adds a second check for each access, returning the appended Stream once the first Stream is empty. – DaoWen Mar 01 '13 at 17:09
  • That's the case when you do messageQueue = a #:: b #:: c #:: d #:: e. But it's different when you do messageQueue = a; messageQueue = messageQueue #::: b; messageQueue = messageQueue #::: c; and so on. As we don't know all elements to add in the beginning, prepending is not an option.. – K J Mar 01 '13 at 17:11
  • 1
    Yes, you're definitely right about that. If you call `#:::` with successive assignments like that then it's the same as forcing left-associativity by adding parenthesis to original string of concatenations. It's still only O(n) in the number of iterators you've appended since the last time you accessed the head—which is a lot better than O(n) in the total size of all iterators added—but that still might be undesirable. If you're really wanting efficiency you're probably going to be stuck with something more verbose. – DaoWen Mar 01 '13 at 17:20
2

What if you just flatten them?

def flattenIterator[T](l: List[Iterator[T]]): Iterator[T] = l.iterator.flatten
Daniel C. Sobral
  • 295,120
  • 86
  • 501
  • 681
  • This turned out to be a good idea though I came up with a different solution using a flatten queue not iterator. Thanks for the suggestion! – K J Mar 25 '13 at 17:14