I am looking for a function that can do something similar to:
merge :: MonadIO m => [Producer m a] -> Producer m a
I had a quick look at stm-conduit
, it looks similar but I am not sure if it fits my requirements:
messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
where
loop = do
mmsg <- liftIO $ getMsg chan ack q
case mmsg of
Just (m, e) -> do
yield (m, e)
liftIO $ ackMsg chan (envDeliveryTag e) False
loop
Nothing -> loop
chan = fst $ amqpChan conn
As you can see, this conduit producer acks a message after yielding it. In a simple "single-threaded" pipeline it works well, the message makes its way to the sink and is then acked.
However with stm-conduit
this may change because, as far as I can understand, the producer would not wait for the message to be consumed by the sink, they would work in parallel instead and the message could be acked prematurely.
Is my understanding of stm-conduit
correct?
And what would be the way to merge separate sources into one to have a nice single-stream semantics?
UPDATE: Updated code to a real working AMQP example as requested (however it may be a bit noisier).
UPDATE 2: I think what I am after could be an Alternative instance for conduit sources so I could do something like let src = src1 <|> src2
. Is it possible somehow?