1

Suppose I have multiple iterators that are ordered. If I wanted to merge these iterators while globally ordering them (e.g. [(1,3,4), (2,4,5)] -> [1,2,3,4,4,5]) using monix how would I do it?

BasilTomato
  • 1,071
  • 1
  • 8
  • 14

3 Answers3

1

Doesn't use Monix, but I'm not sure if that's relevant

import scala.collection.BufferedIterator


def merge[A:Ordering](xs: Seq[Iterator[A]]) = 
  new Iterator[A] {
    val its = xs.map(_.buffered)
    def hasNext = its.exists(_.hasNext)
    def next = its.filter{ _.hasNext}
                  .minBy(_.head)
                  .next
  }


val ys = merge(Seq(List(1,3,5).toIterator, List(2,4,6).toIterator, List(10,11).toIterator))

ys.toList  //> res0: List[Int] = List(1, 2, 3, 4, 5, 6, 10, 11)
The Archetypal Paul
  • 41,321
  • 20
  • 104
  • 134
  • Thanks, I did wanted to know how it can be done using Monix (or one of those reactive programming libraries), but it's still useful. – BasilTomato Jul 18 '16 at 21:35
  • Nicely done. Since `ord` is unused couldn't you have declared `merge[A:Ordering]`? – jwvh Jul 18 '16 at 21:37
  • I'm afraid I don't have time to look into Monix, but I'm not sure how merging iterators and that are connected. Can you expand, maybe in your question? – The Archetypal Paul Jul 18 '16 at 21:39
  • @jwh, yes. I forgot about that. Edited. Also, the tuple stuff was pointless with a bit more thought, so simplified things – The Archetypal Paul Jul 18 '16 at 21:40
  • @TheArchetypalPaul: I guess I should have said `Observable` instead of iterators (though they are the same as in they both represent a stream). I wanted to use Monix because in theory it allows you to tune how it gets executed unlike the std library. For example, imagine each iterator's `next` method involving some fair amount of calculation. Monix should allow this to be done in multiple threads. – BasilTomato Jul 19 '16 at 05:14
  • Thanks. That's why I asked if they were standard iterators before answering :) to understand what you were working with. If I get time I'll take a look at Monix Observables. – The Archetypal Paul Jul 19 '16 at 06:37
0

Since observable is a stream of items, it can be generalized as two types:

  • Finite streams
  • Infinite streams

Note that, in order to sort correctly, you'll need all the items. So, there's no easy way to do this.

For finite streams, you'll have to accumulate all the items and then sort. You can turn this back into an observable with Observable.fromIterable.

val items = List((1,3,4), (2,4,5))

val sortedList = Observable
  .fromIterable(items)
  .flatMap(item => Observable.fromIterable(List(item._1, item._2, item._3)))
  .toListL // Flatten to an Observable[Int]
  .map(_.sorted) 

For infinite streams, the only thing you can do is to buffer the items up to a certain time or size. I don't see any way around since you don't know when the stream will end.

For example,

val itemsStream: Observable[(Int, Int, Int)] = ???

itemsStream
  .bufferIntrospective(10)
  .flatMap((itemList: List[(Int, Int, Int)]) => // You'll have to sort this
     ???
  )
atl
  • 326
  • 1
  • 9
0

A bit late, but I needed merge-sorting monix observables, and couldn't find a solution, this is how I solved it.

The idea is to use bufferWhile on the two heads of the lists, so that you can hold the actual values at hand, and check which of them is smaller

  private def mergeSortInternal(aHead: Observable[Int], bHead: Observable[Int], aTail: Observable[Int], bTail: Observable[Int]): Observable[Int] = {
    (aHead.isEmpty ++ bHead.isEmpty).bufferWhile(_ => true).map {
      isEmptyPair: Seq[Boolean] =>
        val Seq(aIsEmpty, bIsEmpty) = isEmptyPair
        if (aIsEmpty) {
          bHead ++ bTail
        }
        else if (bIsEmpty) {
          aHead ++ aTail
        }
        else {
          {
            (aHead ++ bHead).bufferWhile(_ => true).map((heads: Seq[Int]) => {
              val Seq(aHeadValue, bHeadValue) = heads
              if (aHeadValue < bHeadValue) {
                Observable(aHeadValue) ++ mergeSortInternal(aTail.head, bHead, aTail.tail, bTail)
              }
              else {
                Observable(bHeadValue) ++ mergeSortInternal(aHead, bTail.head, aTail, bTail.tail)
              }
            }).flatten
          }

        }
    }.flatten
  }
  
  private def mergeSort(a: Observable[Int], b: Observable[Int]) = {
    mergeSortInternal(a.head, b.head, a.tail, b.tail)
  }

Hagai
  • 678
  • 7
  • 20