4

I am looking for a function that can do something similar to:

merge :: MonadIO m => [Producer m a] -> Producer m a

I had a quick look at stm-conduit, it looks similar but I am not sure if it fits my requirements:

messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
  where
    loop = do
      mmsg <- liftIO $ getMsg chan ack q
      case mmsg of
        Just (m, e) -> do
          yield (m, e)
          liftIO $ ackMsg chan (envDeliveryTag e) False
          loop
        Nothing     -> loop
    chan = fst $ amqpChan conn

As you can see, this conduit producer acks a message after yielding it. In a simple "single-threaded" pipeline it works well, the message makes its way to the sink and is then acked.

However with stm-conduit this may change because, as far as I can understand, the producer would not wait for the message to be consumed by the sink, they would work in parallel instead and the message could be acked prematurely.

Is my understanding of stm-conduit correct?
And what would be the way to merge separate sources into one to have a nice single-stream semantics?

UPDATE: Updated code to a real working AMQP example as requested (however it may be a bit noisier).

UPDATE 2: I think what I am after could be an Alternative instance for conduit sources so I could do something like let src = src1 <|> src2. Is it possible somehow?

Alexey Raga
  • 7,457
  • 1
  • 31
  • 40
  • Could you please post a self-contained example? I didn't see where the `Queue` type came from. Is it a TBMQueue? – zakyggaps Feb 17 '16 at 11:27
  • It can be an amqp queue or it can be a kafka partition. I don't think it matters that much, but I will update my question with one of these examples. – Alexey Raga Feb 17 '16 at 12:41

2 Answers2

4

Have a look at ZipSource, which is a newtype wrapper whose Applicative lets you combine Sources in the way that you want.

Once you have a ZipSource, you can use zipSources to combine the Sources in a Traversable (e.g. a list) into a Source of Traversables.

The only difference to your desired result type is that it is a Source over a Traversable of values, rather than just a single value, but that shouldn't be much of an issue.

Will Sewell
  • 2,593
  • 2
  • 20
  • 39
  • 1
    Wouldn't it still wait for all the sources to emit before producing a next value? For example, if queue "a" has 1000msg/sec and "b" has 1msg/sec I still want the result source to emit 1000 "a" and 1 "b" and so on – Alexey Raga Feb 17 '16 at 13:07
  • In that case, yes this would not be applicable because it would only produce output once all `Source`s have produced a value. I suggest you read the answers to [this question](http://stackoverflow.com/questions/15594556/fusing-conduits-with-multiple-inputs), which seems to be the same problem as you are having. Note it does look like it something that conduit is not well suited to. – Will Sewell Feb 17 '16 at 13:54
1

mergeSources in stm-conduit maintains a TBMChannel behind the scene. All your Sources / Producers are first connected to the TBMChannel, then it will create a single Source that try pulling values out from the channel FIFO.

You can set the bound of the intermediate TBMChannel when using mergeSources. Say you set the bound to n, then the first n values produced by all the Sources will be dumped to the TBMChannel and the AmqpConn immediately, assuming it's not blocked at the AmqpConn end, and your consumer is slower than the sources (BTW AmqpConn uses unbounded Control.Concurrent.Chan so it won't block). After that the TBMChannel is full so anymore Sources trying to yield a value to the channel is blocked. Your consumer takes value one by one out of the combined source so it's sequential after the first n elements.

To make sure it's sequential from the beginning you can set the bound to 1, however it may cause some performance issues.

zakyggaps
  • 3,070
  • 2
  • 15
  • 25
  • This is pretty much what I am currently using, but there is an interesting edge case here. Say, I only want to take 3 elements from the result stream so I use `=$= take 3` combinator. After that my stream is finished and my resources (connection, channel) are disposed. But since the actual sources are asynchronous they have no idea about it and still try to consume more messages and put them into the `TMChan`. And when they do that I get an exception. Any idea how this can be handled? – Alexey Raga Feb 20 '16 at 03:32
  • @AlexeyRaga I don't think it's possible to deal with that edge case with functionality of `stm-conduit` alone. A the intermediate channel created by `mergeSources` is only closed when all the upstream sources are closed. You may have to pack some finalization code into your source I guess. I once thought methods in [another question](http://stackoverflow.com/questions/35141939/how-to-exit-a-conduit-when-using-mergesources/35153141#35153141) will work but I now doubt if it really works. – zakyggaps Feb 20 '16 at 04:23