Suppose I have multiple iterators that are ordered. If I wanted to merge these iterators while globally ordering them (e.g. [(1,3,4), (2,4,5)] -> [1,2,3,4,4,5]
) using monix how would I do it?

- 1,071
- 1
- 8
- 14
-
I don't know Monix; are these just standard Scala iterators? – The Archetypal Paul Jul 18 '16 at 20:57
-
@TheArchetypalPaul: Yes, just standard Scala iterators. – BasilTomato Jul 18 '16 at 21:01
-
Right. Answer coming. – The Archetypal Paul Jul 18 '16 at 21:02
3 Answers
Doesn't use Monix, but I'm not sure if that's relevant
import scala.collection.BufferedIterator
def merge[A:Ordering](xs: Seq[Iterator[A]]) =
new Iterator[A] {
val its = xs.map(_.buffered)
def hasNext = its.exists(_.hasNext)
def next = its.filter{ _.hasNext}
.minBy(_.head)
.next
}
val ys = merge(Seq(List(1,3,5).toIterator, List(2,4,6).toIterator, List(10,11).toIterator))
ys.toList //> res0: List[Int] = List(1, 2, 3, 4, 5, 6, 10, 11)

- 41,321
- 20
- 104
- 134
-
Thanks, I did wanted to know how it can be done using Monix (or one of those reactive programming libraries), but it's still useful. – BasilTomato Jul 18 '16 at 21:35
-
Nicely done. Since `ord` is unused couldn't you have declared `merge[A:Ordering]`? – jwvh Jul 18 '16 at 21:37
-
I'm afraid I don't have time to look into Monix, but I'm not sure how merging iterators and that are connected. Can you expand, maybe in your question? – The Archetypal Paul Jul 18 '16 at 21:39
-
@jwh, yes. I forgot about that. Edited. Also, the tuple stuff was pointless with a bit more thought, so simplified things – The Archetypal Paul Jul 18 '16 at 21:40
-
@TheArchetypalPaul: I guess I should have said `Observable` instead of iterators (though they are the same as in they both represent a stream). I wanted to use Monix because in theory it allows you to tune how it gets executed unlike the std library. For example, imagine each iterator's `next` method involving some fair amount of calculation. Monix should allow this to be done in multiple threads. – BasilTomato Jul 19 '16 at 05:14
-
Thanks. That's why I asked if they were standard iterators before answering :) to understand what you were working with. If I get time I'll take a look at Monix Observables. – The Archetypal Paul Jul 19 '16 at 06:37
Since observable is a stream of items, it can be generalized as two types:
- Finite streams
- Infinite streams
Note that, in order to sort correctly, you'll need all the items. So, there's no easy way to do this.
For finite streams, you'll have to accumulate all the items and then sort. You can turn this back into an observable with Observable.fromIterable
.
val items = List((1,3,4), (2,4,5))
val sortedList = Observable
.fromIterable(items)
.flatMap(item => Observable.fromIterable(List(item._1, item._2, item._3)))
.toListL // Flatten to an Observable[Int]
.map(_.sorted)
For infinite streams, the only thing you can do is to buffer the items up to a certain time or size. I don't see any way around since you don't know when the stream will end.
For example,
val itemsStream: Observable[(Int, Int, Int)] = ???
itemsStream
.bufferIntrospective(10)
.flatMap((itemList: List[(Int, Int, Int)]) => // You'll have to sort this
???
)

- 326
- 1
- 9
A bit late, but I needed merge-sorting monix observables, and couldn't find a solution, this is how I solved it.
The idea is to use bufferWhile
on the two head
s of the lists, so that you can hold the actual values at hand, and check which of them is smaller
private def mergeSortInternal(aHead: Observable[Int], bHead: Observable[Int], aTail: Observable[Int], bTail: Observable[Int]): Observable[Int] = {
(aHead.isEmpty ++ bHead.isEmpty).bufferWhile(_ => true).map {
isEmptyPair: Seq[Boolean] =>
val Seq(aIsEmpty, bIsEmpty) = isEmptyPair
if (aIsEmpty) {
bHead ++ bTail
}
else if (bIsEmpty) {
aHead ++ aTail
}
else {
{
(aHead ++ bHead).bufferWhile(_ => true).map((heads: Seq[Int]) => {
val Seq(aHeadValue, bHeadValue) = heads
if (aHeadValue < bHeadValue) {
Observable(aHeadValue) ++ mergeSortInternal(aTail.head, bHead, aTail.tail, bTail)
}
else {
Observable(bHeadValue) ++ mergeSortInternal(aHead, bTail.head, aTail, bTail.tail)
}
}).flatten
}
}
}.flatten
}
private def mergeSort(a: Observable[Int], b: Observable[Int]) = {
mergeSortInternal(a.head, b.head, a.tail, b.tail)
}

- 678
- 7
- 20