The goal is to generalize the Streaming.merge
function,
merge :: (Monad m, Ord a) => Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
to an arbitrary number of source streams. The strategy is to use a Data.Heap.Heap
of Stream (Of a) m r
that is sorted by a
. I.e. bigMerge
would have the signature
bigMerge :: (Monad m, Ord a) => [Stream (Of a) m r] -> Stream (Of a) m [r]
(The list could just as well be replaced by Heap
.)
What I have is a rather evil concoction that isn't quite correct. Here it goes:
For completenes, first off the imports:
import qualified Data.Heap as H
import Data.Heap (Heap)
import Data.List (sortBy)
import Data.Function (on)
import Streaming
import qualified Streaming.Prelude as S
import Streaming.Internal (Stream(..)) -- shouldn't!
In order to use Heap
an element of class Ord
is needed:
data Elt a m r = Elt Int (Maybe a) (Stream(Of a) m r)
The extra Int
is introduced to carry along the index of the stream in the input list so that the returned [r]
can be built with elements in the right order. The Maybe a
carries the current value of the stream.
Eq
and Ord
instances are:
instance Eq a => Eq (Elt a m r) where
(Elt i ma _) == (Elt i' ma' _) =
if i == i' then error "Internal error: Index clash in =="
else ma == ma'
instance Ord a => Ord (Elt a m r) where
(Elt i ma s) <= (Elt i' ma' s') | i==i' = error "Internal error: Index clash in <="
| otherwise = cmp (i, ma, s) (i', ma', s')
where
cmp _ (_, Nothing, Return _) = True
cmp (_, Nothing, Return _) _ = False
cmp (i, Just a, _) (i', Just a', _) = if a == a' then i <= i' else a <= a'
cmp (i, _, _) (i', _, _) = i <= i'
Basically, anything is <=
a Return
, and all other cases use the a
and/or the i
to sort Elt
s. (The errors
are for debugging purposes.)
Some helper function make Elt
from Stream
and Heap
from lists of Stream
.
eltFromStream :: (Monad m, Ord a) => Int -> Stream (Of a) m r -> m (Elt a m r)
eltFromStream i (Return r) = return $ Elt i Nothing (Return r)
eltFromStream i (Effect m) = do
stream' <- m
return $ Elt i Nothing stream'
eltFromStream i (Step (a :> rest)) = return $ Elt i (Just a) rest
heapFromStreams :: (Monad m, Ord a) => [Stream (Of a) m r] -> m (Heap (Elt a m r))
heapFromStreams strs = H.fromList <$> (sequence $ fmap (uncurry eltFromStream) (zip [0..] strs))
The core piece is the loop
function
loop :: (Monad m, Ord a) => Heap (Elt a m r) -> m (Heap (Elt a m r))
loop h = do
let (Elt i ma s, h') = unsafeUncons h
elt <- case s of
Return r -> return $ Elt i Nothing (Return r)
Effect m -> Elt i Nothing <$> m
Step (a :> rest) -> return $ Elt i (Just a) rest
return $ H.insert elt h'
with the cheeky unsafeUncons
being
unsafeUncons :: Heap a -> (a, Heap a)
unsafeUncons h = case H.uncons h of
Nothing -> error "Internal error"
Just x -> x
The loop
function is used in heapMerge
that turns the Heap
into a Stream
heapMerge :: (Monad m, Ord a) => Heap (Elt a m r) -> Stream (Of a) m [r]
heapMerge h = case (ma,s) of
(Nothing, Return _) -> Return $ getRs h
(_, Effect m) -> error "TODO"
(Just a, _) -> do
h' <- lift $ loop h
Step (a :> heapMerge h')
where
Elt i ma s = H.minimum h
getRs
just assembles the Return
values into a list
getRs :: (Monad m, Ord a) => Heap (Elt a m r) -> [r]
getRs h = snd <$> sortBy (compare `on` fst) (map f (H.toUnsortedList h))
where
f :: Monad m => Elt a m r -> (Int, r)
f (Elt i _ (Return r)) = (i,r)
f _ = error "Internal error: Call getR only after stream has finished!"
Then, finally,
bigMerge :: (Monad m, Ord a) => [Stream (Of a) m r] -> Stream (Of a) m [r]
bigMerge streams =
if null streams then Return []
else do
h <- lift $ heapFromStreams streams
heapMerge h
This is convoluted, Effect
is not treated correctly, it relies on Return
, Step
, Effect
instead of inspect
and next
. It does produce the right result on simple inputs, e.g.
s1 = S.each [2,4,5::Int]
s2 = S.each [1,2,4,5::Int]
s3 = S.each [3::Int]
S.print $ merge [s1,s2,s3]
I'm sure there's a way to do this correctly and more idiomatically. For one thing, the Maybe a
in the Elt
is probably redundant, I can make (Stream (Of a) m r)
an instance of Ord
directly, and if Effect
s are just pattern-matched, not executed, then this should be ok. But Stream (Of (Heap (Stream (Of a) m r, Int))) (Heap (Int,r))
looks weird. A "stream with index" IStream a m r = IStream Int ((Heap (Stream (Of a) m r) deriving Functor
is a functor in r
, so, with appropriate ==
and <=
, I'd be looking at Stream (IStream a m) m (Heap (Int, r))
?
This functorial aspect of the streaming
library is still a bit of a puzzle to me, so any help would be appreciated.