3

Note: If you haven't stumbled upon asynchronous circuits yet, reading my two other Stack Overflow posts (besides John Hughes' article where he wrote them down) on them could come in quite handy: "An ArrowCircuit instance for stream processors which could block", "Hughes' Fibonacci stream".


John Hughes has come up with the following type for asynchronous circuits in his well-known "Generalising Monads to Arrows":

data StreamProcessor a b = Get (a -> StreamProcessor a b) | 
                           Put b    (StreamProcessor a b) |
                           Halt

instance Category StreamProcessor where
    id = Get (\ x -> Put x id)
  
    Put c bc . ab = Put c (bc . ab)                          {- 1 -}
    Get bbc . Put b ab = (bbc b) . ab                        {- 2 -}
    Get bbc . Get aab = Get $ \ a -> (Get bbc) . (aab a)     {- 3 -}
    Get bbc . Halt = Halt                                    {- 4 -}
    Halt . ab = Halt                                         {- 5 -}

bypass :: [b] -> [d] -> StreamProcessor b c -> StreamProcessor (b, d) (c, d)
bypass [] ds (Get f)          = Get $ \ ~(b, d) -> bypass [] (ds ++ [d]) (f b)
bypass (b : bs) [] (Get f)    = bypass bs [] (f b)
bypass [] (d : ds) (Put c sp) = Put (c, d) $ bypass [] ds sp
bypass bs [] (Put c sp) =      
  Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)
bypass bs ds Halt             = Halt

instance Arrow StreamProcessor where
  arr f = Get $ \ a -> Put (f a) (arr f)
  first = bypass [] []

instance ArrowChoice StreamProcessor where
    left (Put b ab) = Put (Left b) (left ab)
    left (Get aab)  = Get $ \ a -> case a of 
                                       Left a' -> (left . aab) a'
                                       Right d -> Put (Right d) (left $ Get aab)
    left Halt = Halt

In his other paper ("Programming with arrows") he wrote down a combinator which works for arrows (well, ArrowChoices) like mapM for Monads:

biteOff :: [a] -> Either [a] (a, [a])
biteOff []       = Left []
biteOff (x : xs) = Right (x, xs)

mapA :: ArrowChoice k => k a b -> k [a] [b] 
mapA k = let
            go :: ArrowChoice k => k a b -> k (a, [a]) [b]
            go k = k *** mapA k >>^ uncurry (:)
         in arr biteOff >>> (arr (const []) ||| go k)

Long story short, it does not work on StreamProcessors:

GHCi> Put x sp = Put [1, 2, 3] Halt >>> mapA id
GHCi> x
*** Exception: stack overflow
GHCi> Put x sp = Put [] Halt >>> mapA id
GHCi> x
*** Exception: stack overflow

Interestingly enough, it is not just the value that can't decide, but also the constructor itself:

GHCi> Get f = Put [1, 2, 3] Halt >>> mapA id
GHCi> Put x sp = f ()
GHCi> x
*** Exception: stack overflow

So, here is how I understand whatever's happening:

StreamProcessor has pretty cumbersome rules for composition. To compose two arrows, we often have to know both constructors. Therefore, when we stumble into an infinite series with composition, we just have to hope that the {- 1 -}st and the {- 5 -}th rule for (.) only would be working. We aren't quite lucky:

  • mapA = arr biteOff >>> (arr (const []) ||| go k).

  • arr biteOff >>> (arr (const []) ||| go k) = (arr (const []) ||| go k) . arr biteOff.

  • arr (const []) ||| go k = left (arr (const [])) >>> arr mirror >>> left (go k) >>> arr mirror >>> arr untag. See (|||), mirror and untag here.

  • >>> is infixr. Hence: left (arr (const [])) >>> (arr mirror >>> (left (go k) >>> (arr mirror >>> arr untag))).

  • arr mirror >>> arr untag is a Get, since both of them are Gets (rule {- 3 -} for (.)).

  • left (go k) >>> (arr mirror >>> arr untag) = (arr mirror >>> arr untag) . left (go k) = Get ... . left (go k). Rules from {- 2 -} through {- 4 -} work here. As one can see, we need to pattern-match left (go k) now. A glance at the left tells us that we need to pattern-match go k.

  • go k = k *** mapA k >>^ uncurry (:).

  • k *** mapA k >>^ uncurry (:) = first k >>> arr swap >>> first (mapA k) >>> arr swap >>> arr (uncurry (:)).

  • first k >>> arr swap >>> first (mapA k) >>> arr swap >>> arr (uncurry (:)) = ... . (first (mapA k) . ...). first (mapA k) turned out to be the first argument of some right-to-left composition (since . is infixr), therefore it needs to be pattern-matched. A quick glance at first shows that we need to pattern-match mapA k.

  • Start over.

So, since this didn't quite work out, I thought about writing down some mapSP for StreamProcessors only. Yet, it doesn't seem to be a great idea now:

Say, we applied mapSP to some sp. A list [a1, a2] is input. sp blocks on a1 and goes on with a2. I see no way to deal with this.

So, is my understanding as to why generic mapA does not work for StreamProcessors right? Is there some mapSP :: StreamProcessor a b -> StreamProcessor [a] [b] which would add up, not just type check?

Zhiltsoff Igor
  • 1,812
  • 8
  • 24
  • `mapSP` would be easy to write if you had `concatSP :: [SteamProcessor a b] -> StreamProcessor [a] [b]`. What would be the semantics of `concatSP`? – Benjamin Hodgson Jan 15 '21 at 17:35
  • @BenjaminHodgson let's do this with lists of length 2: if we have `[Put 1 sp, Put 2 sp']`, we would like to get `Put [1, 2] (concatSP [sp, sp'])`. If we have `[Get aab, Get aab']`, we would like to get `Get $ \ [a, a'] -> concatSP [aab a, aab' a']`. If we have `[Put 1 sp, Get aab]`, I see no way to contact them. I reckoned the main idea to be that our stream is either blocked or unblocked. I guess, my lack of experience hinders with understanding. Can you give me another hint, how shall we deal with a list of both blocked and unblocked streams? – Zhiltsoff Igor Jan 15 '21 at 17:48
  • Something tells me it's not possible. Your `Get $ \ [a, a'] -> concatSP [aab a, aab' a']` is rather suspect (nb that irrefutable pattern) – Benjamin Hodgson Jan 15 '21 at 18:38
  • @BenjaminHodgson this was written down for lists of length 2 only. The logic was that if we can't get it working for lists of length 2, other lists won't add any hope. It just seemed that you had a way to write this down in mind. Just as I've said in the post, I can't see any working way neither. – Zhiltsoff Igor Jan 15 '21 at 18:47
  • @BenjaminHodgson please note that this question is more about agreeing or disagreeing with my explanation as to why generic `mapA` does not work for `StreamProcessor`s and why `mapSP` does not exist. Disagreeing would have been a tweaked `mapA` or written down `mapSP` with usage. – Zhiltsoff Igor Jan 15 '21 at 18:50
  • I don't think the "list of length 2" case is the concern. Consider that `f *** f` is pseudo-equivalent to `mapA f` when the input and output lists are always length 2 (you can use `arr` to convert between pairs and 2-element lists). I think the problem is with the generic definition of `mapA` and the fact that input and output lists can be arbitrary lengths. – DDub Jan 15 '21 at 19:56
  • @DDub as I’ve said, “lists of length 2” were merely an illustration to show that there is no way to deal with things like `[Get aab, Put b ab]` (the last but one extract in the post). I agree that generic `mapA` is not working :(. My (first) question is whether I’ve pinpointed the issue right. What do you think? **Besides,** wanted to thank you for the interest in my posts. Always happy to discuss such things :). – Zhiltsoff Igor Jan 15 '21 at 22:49
  • You say there's no way to deal with things like `[Get aab, Put b ab]`, but what happens in `Get aab *** Put b ab`? Aren't these two morally the same? Or, are you suggesting that `***` is as broken as `mapA`? – DDub Jan 15 '21 at 23:23
  • @DDub ah, I get it now. You are right about this example. A poor choice on my behalf. – Zhiltsoff Igor Jan 16 '21 at 07:51

1 Answers1

1

I don't think you're going to get a mapA as general as you want, but I'm also not sure your reasoning for why not is correct. As I see it, the reason mapA can't work is because StreamProcessor is strict in its shape, and mapA as it's defined requires it to be lazy in its shape. On the other hand, your argument about one part blocking while another continues is actually fine.

To see why this "length-2 list" example is really not a problem, we need look only as far as bypass. After all, mapA f specialized to a length-2 list is no different than:

mapA2 :: Arrow k => k a b -> k [a] [b] -- These lists must be length 2!
mapA2 f = arr (\[a,b] -> (a,b)) >>> (f *** f) >>> arr (\(a,b) -> [a,b])

Now, let's construct a couple StreamProcessors, one that's a Get and one that's a Put:

putOnes :: StreamProcessor a Int
putOnes = Put 1 putOnes

sumTwo :: StreamProcessor Int Int
sumTwo = Get $ \x -> Get $ \y -> Put (x+y) sumTwo

And it's no problem to combine them with ***:

> runStreamProcessor (putOnes *** sumTwo) (zip (repeat 1) [1..10])
[(1,3),(1,7),(1,11),(1,15),(1,19)]

If this is okay, then clearly mapA2 works, which means that mapA is not in trouble just because Gets and Puts don't always match up. The mapA just needs to do some synchronizing.


So how should mapA work? Let's consider what it would mean to write mapA putOnes. One might think that it should simply be a Put, especially considering that putOnes contains only Puts. But, if it doesn't have a Get, than what is contained in that Put? It seems that regardless of the input, the output to runStreamProcessor (mapA putOnes) anyOldList will be an infinite list of infinite lists of ones! This is definitely not the desired behavior of mapA, but that's because mapA is only intended to be run with synchronous signal functions. In the synchronous case, there is always one input to one output, which means we morally always start with a Get, and any signal functions beyond the length of the input list can safely be ignored.

This leaves us with two options for a "sensible" mapSP.

  • First, we can pre-define the length of the list. If we know the lengths statically, then this is equivalent to using *** and writing the list as nested pairs (much like I did with mapA2 above).
  • Second, we can force a bit of synchronicity into the system by making mapSP always start with a Get and using the length of the input list to determine what to do. From there, we proceed by caching inputs, much like how bypass works:
mapSP :: StreamProcessor a b -> StreamProcessor [a] [b]
mapSP sp = go (repeat (sp, []))
  where
    -- We keep track of each SP along with its own personal queue of inputs.
    -- We always start with a Get, to determine how many outputs to produce.
    go sps = Get $ \as -> stepPuts (zipWith addA as sps) (drop (length as) sps)

    addA a (s, lst) = (s, lst++[a])

    stepPuts sps rest = case stepPuts' sps of
      Just (unzip -> (xs, sps')) -> Put xs $ stepPuts sps' rest
      Nothing -> go (sps ++ rest)

    stepPuts' [] = Just []
    stepPuts' ((Get f, a:as):rest) = stepPuts' ((f a, as) : rest)
    stepPuts' ((Put x s, as):rest) = ((x, (s, as)):) <$> stepPuts' rest
    stepPuts' _ = Nothing

Note that there are a lot of open choices in how this was written, most of which come down to how much asynchrony we want to keep around. We can explore this by looking at a few examples. Let's start by lifting one from Hughes' paper:

> runStreamProcessor (mapSP (delay 0)) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[0,0,0],[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,0]]

Hmm, in Hughes' paper, mapA (delay 0) acted as a "column delayer", producing a list like [[0,0,0],[1,2],[4],[6,5],[7,8,3],[9,10,11,0]], but this mapSP did not. Why? It's because of the first case in stepPuts, which tells us to recur and call stepPuts again. The idea here is that if we have a StreamProcessor that produces multiple outputs per input, perhaps we don't want the mapSP to delay those outputs. For instance:

putTwice :: StreamProcessor a a
putTwice = Get $ \x -> Put x $ Put x putTwice
> runStreamProcessor (mapSP putTwice) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[1,2,3],[1,2,3],[4,5],[4,5],[6],[6],[7,8],[7,8],[9,10,11],[9,10,11],[12,13,14,15],[12,13,14,15]]

If we wanted a more synchronous semantics, we could change the definition of stepPuts to:

    stepPuts sps rest = case stepPuts' sps of
      Just (unzip -> (xs, sps')) -> Put xs $ go (sps' ++ rest)
      -- Just (unzip -> (xs, sps')) -> Put xs $ stepPuts sps' rest
      Nothing -> go (sps ++ rest)

Now, we recover something like Hughes' more synchronous mapA semantics:

> runStreamProcessor (mapSP (delay 0)) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[0,0,0],[1,2],[4],[6,5],[7,8,3],[9,10,11,0]]
> runStreamProcessor (mapSP putTwice) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[1,2,3],[1,2],[4],[4,5],[6,5,3],[6,8,11,15]]
DDub
  • 3,884
  • 1
  • 5
  • 12
  • As for the argument against `mapA`, the lists of length 2 were just an example (a very poor one indeed). The argument itself is the list with dots in the post. I didn't quite catch what you think about it. – Zhiltsoff Igor Jan 16 '21 at 08:07
  • As for `mapSP`, it looks pretty cool, thank you. Yet, I wanted to use it to do things like John Hughes did with tables in his article. Like using `mapA (delay 0)` to delay every **column** (see p. 14 of "Programming with Arrows"). I understand that `Put c1 (Put c2 Halt) >>> mapSP (delay 0)` wouldn't quite work. Is there a way to do it? – Zhiltsoff Igor Jan 16 '21 at 08:10
  • I'm not sure what you mean by "the list with dots in the post". That said, your point that "To compose two arrows, we often have to know both constructors" is exactly right and is a key reason why Hughes' `mapA` doesn't work for `StreamProcessor`. – DDub Jan 16 '21 at 15:43
  • As for your question about column delays, I updated the answer to address that. – DDub Jan 16 '21 at 16:29