Say I have simple producer/consumer model where the consumer wants to pass back some state to the producer. For instance, let the downstream-flowing objects be objects we want to write to a file and the upstream objects be some token representing where the object was written in the file (e.g. an offset).
These two processes might look something like this (with pipes-4.0
),
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
import Pipes
import Pipes.Core
import Control.Monad.Trans.State
import Control.Monad
newtype Object = Obj Int
deriving (Show)
newtype ObjectId = ObjId Int
deriving (Show, Num)
writeObjects :: Proxy ObjectId Object () X IO r
writeObjects = evalStateT (forever go) (ObjId 0)
where go = do i <- get
obj <- lift $ request i
lift $ lift $ putStrLn $ "Wrote "++show obj
modify (+1)
produceObjects :: [Object] -> Proxy X () ObjectId Object IO ()
produceObjects = go
where go [] = return ()
go (obj:rest) = do
lift $ putStrLn $ "Producing "++show obj
objId <- respond obj
lift $ putStrLn $ "Object "++show obj++" has ID "++show objId
go rest
objects = [ Obj i | i <- [0..10] ]
Simple as this might be, I've had a fair bit of difficulty reasoning about how to compose them. Ideally, we'd want a push-based flow of control like the following,
writeObjects
starts by blocking onrequest
, having sent the initialObjId 0
upstream.produceObjects
sends the first object,Obj 0
, downstreamwriteObjects
writes the object and increments its state, and waits onrequest
, this time sendingObjId 1
upstreamrespond
inproduceObjects
returns withObjId 0
produceObjects
continues at Step (2) with the second object,Obj 1
My initial attempt was with push-based composition as follows,
main = void $ run $ produceObjects objects >>~ const writeObjects
Note the use of const
to work around the otherwise incompatible types (this is likely where the problem lies). In this case, however, we find that ObjId 0
gets eaten,
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 1
Producing Obj 1
...
A pull-based approach,
main = void $ run $ const (produceObjects objects) +>> writeObjects
suffers a similar issue, this time dropping Obj 0
.
How might one go about composing these pieces in the desired manner?