2

The goal is to generalize the Streaming.merge function,

merge :: (Monad m, Ord a) => Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s) 

to an arbitrary number of source streams. The strategy is to use a Data.Heap.Heap of Stream (Of a) m r that is sorted by a. I.e. bigMerge would have the signature

bigMerge :: (Monad m, Ord a) => [Stream (Of a) m r] -> Stream (Of a) m [r]

(The list could just as well be replaced by Heap.)

What I have is a rather evil concoction that isn't quite correct. Here it goes:

For completenes, first off the imports:

import qualified Data.Heap as H
import Data.Heap (Heap)
import Data.List (sortBy)
import Data.Function (on)
import Streaming
import qualified Streaming.Prelude as S
import Streaming.Internal (Stream(..))  -- shouldn't!

In order to use Heap an element of class Ord is needed:

data Elt a m r = Elt Int (Maybe a) (Stream(Of a) m r)

The extra Int is introduced to carry along the index of the stream in the input list so that the returned [r] can be built with elements in the right order. The Maybe a carries the current value of the stream.

Eq and Ord instances are:

instance Eq a => Eq (Elt a m r) where
(Elt i ma _) == (Elt i' ma' _) = 
    if i == i' then error "Internal error: Index clash in ==" 
    else ma == ma'

instance Ord a => Ord (Elt a m r) where
(Elt i ma s) <= (Elt i' ma' s') | i==i' = error "Internal error: Index clash in <="
                                | otherwise = cmp (i, ma, s) (i', ma', s')
    where 
    cmp _                      (_, Nothing, Return _) = True
    cmp (_, Nothing, Return _) _                      = False
    cmp (i, Just a, _)         (i', Just a', _)       = if a == a' then i <= i' else a <= a'
    cmp (i, _, _)              (i', _, _)             = i <= i'

Basically, anything is <= a Return, and all other cases use the a and/or the i to sort Elts. (The errors are for debugging purposes.)

Some helper function make Elt from Stream and Heap from lists of Stream.

eltFromStream :: (Monad m, Ord a) => Int -> Stream (Of a) m r -> m (Elt a m r)
eltFromStream i (Return r) = return $ Elt i Nothing (Return r)
eltFromStream i (Effect m) = do
    stream' <- m
    return $ Elt i Nothing stream'
eltFromStream i (Step (a :> rest)) = return $ Elt i (Just a) rest

heapFromStreams :: (Monad m, Ord a) => [Stream (Of a) m r] -> m (Heap (Elt a m r))
heapFromStreams strs = H.fromList <$> (sequence $ fmap (uncurry eltFromStream) (zip [0..] strs))

The core piece is the loop function

loop :: (Monad m, Ord a) => Heap (Elt a m r) -> m (Heap (Elt a m r))
loop h = do
let (Elt i ma s, h') = unsafeUncons h
elt <- case s of
    Return r         -> return $ Elt i Nothing (Return r)
    Effect m         -> Elt i Nothing <$> m
    Step (a :> rest) -> return $ Elt i (Just a) rest
return $ H.insert elt h'

with the cheeky unsafeUncons being

unsafeUncons :: Heap a -> (a, Heap a)
unsafeUncons h = case H.uncons h of
Nothing -> error "Internal error"
Just x -> x

The loop function is used in heapMerge that turns the Heap into a Stream

heapMerge :: (Monad m, Ord a) => Heap (Elt a m r) -> Stream (Of a) m [r]
heapMerge h = case (ma,s) of
    (Nothing, Return _) -> Return $ getRs h
    (_, Effect m) -> error "TODO"
    (Just a, _)  -> do
        h' <- lift $ loop h
        Step (a :> heapMerge h')
    where
        Elt i ma s = H.minimum h

getRs just assembles the Return values into a list

getRs :: (Monad m, Ord a) => Heap (Elt a m r) -> [r]
getRs h = snd <$> sortBy (compare `on` fst) (map f (H.toUnsortedList h))
where
    f :: Monad m => Elt a m r -> (Int, r)
    f (Elt i _ (Return r)) = (i,r)
    f _ = error "Internal error: Call getR only after stream has finished!"

Then, finally,

bigMerge :: (Monad m, Ord a) => [Stream (Of a) m r] -> Stream (Of a) m [r]
bigMerge streams = 
if null streams then Return [] 
else do
    h <- lift $ heapFromStreams streams
    heapMerge h

This is convoluted, Effect is not treated correctly, it relies on Return, Step, Effect instead of inspect and next. It does produce the right result on simple inputs, e.g.

s1 = S.each [2,4,5::Int]
s2 = S.each [1,2,4,5::Int]
s3 = S.each [3::Int]
S.print $ merge [s1,s2,s3]

I'm sure there's a way to do this correctly and more idiomatically. For one thing, the Maybe a in the Elt is probably redundant, I can make (Stream (Of a) m r) an instance of Ord directly, and if Effects are just pattern-matched, not executed, then this should be ok. But Stream (Of (Heap (Stream (Of a) m r, Int))) (Heap (Int,r)) looks weird. A "stream with index" IStream a m r = IStream Int ((Heap (Stream (Of a) m r) deriving Functor is a functor in r, so, with appropriate == and <=, I'd be looking at Stream (IStream a m) m (Heap (Int, r))?

This functorial aspect of the streaming library is still a bit of a puzzle to me, so any help would be appreciated.

mcmayer
  • 1,931
  • 12
  • 22

1 Answers1

3

The signature of bigMerge looks awfully like the signature of sequenceA from Data.Traversable:

sequenceA :: Applicative f => [f r] -> f [r]

The problem of course is that we can't use the standard Applicative instance for Streams, because it concatenates instead of merging. But we can try to create our own instance over a newtype:

{-# LANGUAGE DeriveFunctor #-}
import Streaming
import qualified Streaming.Prelude as S

newtype MergeStream a m r = 
    MergeStream { getMergeStream :: Stream (Of a) m r } deriving Functor

-- BEWARE! Only valid for ORDERED streams!
instance (Monad m, Ord a) => Applicative (MergeStream a m) where
    pure x = MergeStream (pure x)
    MergeStream f <*> MergeStream x = MergeStream (uncurry ($) <$> S.merge f x) 

Now, using s1, s2 and s3 from your example, and standard Traversable functions:

ghci> S.toList_ $ getMergeStream . traverse MergeStream $ [s1,s2,s3]
[1,2,2,3,4,4,5,5]

This seems to work. That said, your attempt of implementing bigMerge using the Stream internals and a heap might still be worth it, for efficiency reasons.

danidiaz
  • 26,936
  • 4
  • 45
  • 95