I've used zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r')
for this but it is considered deprecated.

- 488
- 2
- 10
-
What behavior, *exactly*, do you want the "combined" sinks to have? I tried looking at the old documentation and implementation of `zipSinks`, but the behavior not easily discernible at a glance. – Dan Burton Aug 07 '12 at 15:58
-
@DanBurton: `zipSinks` takes two Sinks and returns a Sink that produces a pair with results of corresponding Sinks. For example `sizeCrc32Sink = zipSinks sizeSink crc32Sink` will count size and checksum. I what the same behavior as described by Oleg [here](http://okmij.org/ftp/Streams.html#1enum2iter). – tymmym Aug 07 '12 at 16:46
-
Ok I see; it basically hooks up the awaits and feeds the upstream output to both sinks simultaneously, sort of forking the input stream in two. The docs for Data.Conduit.Util state that "there are now easier ways to handle their use cases" but I see no easier way for this use case, as it requires delving into conduit internals to implement. – Dan Burton Aug 08 '12 at 03:46
2 Answers
Edit
After considering this, I don't think it is possible with the current version of Data.Conduit. Pipes aren't Categories, so &&&
is out of the question. And there's no way that I can think of to pull results from upstream, feed them incrementally to both sinks, and short-circuit when the first sink finishes. (Although I don't think that Data.Conduit.Util.zipSinks
short-circuits this way, it seems like it would be very desirable.) Except of course, to pattern match on both Sinks (like zipSinks
in the package does), but that's what we're trying to avoid here.
That said, I would love to be proven wrong here.
It's not pretty, but you can do this in a kind-of obvious way.
First imports:
module Main where
import Control.Monad.Trans
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Text as CT
import qualified Data.Conduit.Util as CU
import Data.Maybe
import Data.Text (unpack)
Now for zipSinks
. Basically, you want to create a sink that pulls the input from upstream and sends it to each child sink separately. In this case, I've used CL.sourceList
to do this. If await
returns Nothing
, maybeToList
returns an empty list, so the child sinks are also run with no input. Finally, the output of each child sink is then fed into the tuple.
zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r')
zipSinks s1 s2 = do
l <- fmap maybeToList await
o1 <- lift $ CL.sourceList l $$ s1
o2 <- lift $ CL.sourceList l $$ s2
return (o1, o2)
Here are some examples of using zipSinks
. It appears to work fine both inside of IO
and outside of it, and in the few tests I did, the output matches the output of zipped'
, created using the old zipSinks
.
doubleHead :: Monad m => Sink Int m (Maybe Int)
doubleHead = await >>= return . fmap (2*)
-- old version
zipped' :: Monad m => Sink Int m (Maybe Int, Maybe Int)
zipped' = CU.zipSinks CL.head doubleHead
-- new version
zipped :: Monad m => Sink Int m (Maybe Int, Maybe Int)
zipped = zipSinks CL.head doubleHead
fromList = CL.sourceList [7, 8, 9] $$ zipped
-- (Just 7, Just 14)
fromFile :: String -> IO (Maybe Int, Maybe Int)
fromFile filename = runResourceT $
CB.sourceFile filename
$= CB.lines
$= CT.decode CT.utf8
$= CL.map (read . unpack)
$$ zipped
-- for a file with the lines:
--
-- 1
-- 2
-- 3
--
-- returns (Just 1, Just 2)

- 311
- 1
- 3
-
Nice! (NB, you could write `await >>= return . fmap (2*)` for `doubleHead`, and similarly, `l <- fmap maybeToList await` instead of using `input` in `zipSinks`. Also, is `Data.Conduit.Internals` an extraneous import?) – huon Aug 15 '12 at 19:31
-
Yeah, I realized that I probably could have used functors in some places. I'm still enough of a n00b at Haskell that that's usually a second-pass edit for me, unfortunately. And yes, `Data.Conduits.Internals` is extraneous. Originally, I was looking at using `sinkToPipe` from it. Thanks for pointing these out. I'll update the answer. – Eric Aug 15 '12 at 20:14
-
2Your version of `zipSinks` combines only the first elements. For example `runResourceT $ CL.sourceList [1,2,3] $$ zipSinks (CL.take 2) (CL.take 2)` will return `([1],[1])` but should `([1,2],[1,2])`. – tymmym Aug 16 '12 at 05:49
-
Hmm. That's a problem. I think there might be a way to do it with the arrow fan-out operator (`&&&`), but I haven't teased it out yet. – Eric Aug 16 '12 at 13:57
((The package is conduit-0.5.2.3. The whole module is just for backwards compatibility.))
[edit]
So, my straightforward monadic guess (see below) seems to be wrong, even though the types are correct. Now, I can only guess that the answer is:
The replacing features are still in development, pretty much like all Pipe/Conduit and similar concepts and libraries.
I'd wait for the next API to solve this question and still use zipSink
until then.
(Maybe it was just misplaced.)
[/edit]
I'm not that familar with this package, but wouldn't it do just the same as this?
zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r')
zipSinks s1 s2 = (,) <$> s1 <*> s2
It is a Monad after all. (Functor, Applicative)
zipSinks :: Monad sink => sink r -> sink r' -> sink (r, r')
zipSinks s1 s2 = liftM2 (,) s1 s2

- 5,134
- 2
- 33
- 31
-
2Types are ok, but not semantics. Your version of `zipSinks` will run sinks consecutively and first sink will fully consume the source. – tymmym Aug 15 '12 at 05:56