I am trying to understand the "Streams as arrows" section in John Hughes' famous "Generalising Arrows to Monads". To be more precise, I am interested in writing down the Fibonacci stream.
I tweaked Hughes' definition a bit:
data StreamProcessor a b = Get (a -> StreamProcessor a b) |
Put b (StreamProcessor a b) |
Halt
put = Put
get = Get
First of all, I treat stream processors as lists which may block (waiting for input). That is:
Put :: b -> StreamProcessor a b -> StreamProcessor a b
matches(:) :: a -> [a] -> [a]
;Halt :: StreamProcessor a b
matches[] :: [a]
;Get :: (a -> StreamProcessor a b) -> StreamProcessor a b
helps us block the stream and wait for input.
Therefore, if we drop the Get
we get a list-like structure. If we also drop Halt
we get an infinite-list-like structure.
Here are two ways I would understand "a stream of Fibonaccis":
a non-blocked infinite stream (infinite-list-like):
zipNonBlockedStreamsWith :: (a -> b -> c) -> StreamProcessor () a -> StreamProcessor () b -> StreamProcessor () c zipNonBlockedStreamsWith f (Put x sp) (Put y sp') = Put (f x y) (zipNonBlockedStreamsWith f sp sp') zipNonBlockedStreamsWith f Halt sp = Halt zipNonBlockedStreamsWith f sp Halt = Halt fibs :: StreamProcessor () Int fibs = put 0 (put 1 $ zipNonBlockedStreamsWith (+) fibs (tailNonBlockedStream fibs)) -- matches a well-known definition of an infinite Fibonacci list. fibsList :: [Int] fibsList = 0 : 1 : (zipWith (+) fibsList (tail fibsList)) -- with the 'fibsList', we can use folds to do the same thing. putStream :: [b] -> StreamProcessor a b -> StreamProcessor a b putStream bs sp = foldr Put sp bs fibs' :: StreamProcessor () Int fibs' = putStream fibsList Halt
A blocked stream waits for
n
, outputs then
th Fibonacci number and blocks again. Hughes'Arrow
interface helps us express it in a quite concise way:instance Category StreamProcessor where ... instance Arrow StreamProcessor where arr f = Get $ \ a -> Put (f a) (arr f) ... fibsList :: [Int] fibsList = 0 : 1 : (zipWith (+) fibsList (tail fibsList)) blockedFibs :: StreamProcessor Int Int blockedFibs = arr (fibsList !!)
Yet, in the paper I linked John Hughes shows another solution, Arrow
ing his way through:
instance Category StreamProcessor where
id = Get (\ x -> Put x id)
Put c bc . ab = Put c (bc . ab)
Get bbc . Put b ab = (bbc b) . ab
Get bbc . Get aab = Get $ \ a -> (Get bbc) . (aab a)
Get bbc . Halt = Halt
Halt . ab = Halt
bypass :: [b] -> [d] -> StreamProcessor b c -> StreamProcessor (b, d) (c, d)
bypass [] ds (Get f) = Get $ \ ~(b, d) -> bypass [] (ds ++ [d]) (f b)
bypass (b : bs) [] (Get f) = bypass bs [] (f b)
bypass [] (d : ds) (Put c sp) = Put (c, d) $ bypass [] ds sp
bypass bs [] (Put c sp) =
Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)
bypass bs ds Halt = Halt
instance Arrow StreamProcessor where
arr f = Get $ \ a -> Put (f a) (arr f)
first = bypass [] []
liftArr2 :: Arrow k => (a -> b -> c) -> k r a -> k r b -> k r c
liftArr2 f a b = a &&& b >>^ uncurry f
fibsHughes = let
fibsHughes' = put 1 (liftArr2 (+) fibsHughes fibsHughes')
in put 0 fibsHughes'
But it does not work the way I expect. The following function would help us take the values from the stream until it blocks or halts (using Data.List.unfoldr
):
popToTheBlockOrHalt :: StreamProcessor a b -> [b]
popToTheBlockOrHalt = let
getOutput (Put x sp) = Just (x, sp)
getOutput getOrHalt = Nothing
in unfoldr getOutput
So, here is what we get:
GHCi> popToTheBlockOrHalt fibsHughes
[0, 1]
GHCi> :t fibsHughes
fibsHughes :: StreamProcessor a Integer
If we check the patterns, we would see that it blocks (that is we stumble into Get
).
I cannot tell whether it should be that way. If it is what we want, why? If not, what is the problem? I checked and rechecked the code I wrote and it pretty much matches the definitions in Hughes' article (well, I had to add id
and patterns for Halt
- I cannot see how they could have messed the process up).
Edit: As it is said in the comments, in the later edition of the paper bypass
was slightly changed (we use that one). It is able to accumulate both withheld b
s and d
s (that is has two queues), whereas the bypass
from the original paper accumulates just d
s (that is one queue).
Edit #2: I just wanted to write down a function which would pop the Fibonacci numbers from the fibsHughes
:
popToTheHaltThroughImproperBlocks :: StreamProcessor () b -> [b]
popToTheHaltThroughImproperBlocks = let
getOutput (Put x sp) = Just (x, sp)
getOutput (Get c) = getOutput $ c ()
getOutput Halt = Nothing
in unfoldr getOutput
And here we go:
GHCi> (take 10 . popToTheHaltThroughImproperBlocks) fibsHughes
[0,1,1,2,3,5,8,13,21,34]