15

I have a saga setup in a fork/join configuration.

Events defined on the saga

  • FileMetadataMsg
  • FileReadyMsg
  • SomeOtherMsg

Process starts off when a file comes in on a separate listener.

  • Publishes SagaStart(correlationId)
  • Publishes FileSavedToMsg(correlationId, fileLoc)
  • Publishes FileMetadataMsg(correlationId, metadata)
  • Publishes FileReadyMsg(correlationId, fileLoc)

Downstream endpoint of does some work on the file

Consumer<FileSavedToMsg>

  • Publishes SomeOtherMsg(GotTheFileMsg.correlationId, data)

I am getting a FileSavedToMsg in the saga_skipped queue. I can only assume it's due to having a correlationId on the FileSavedToMsg because the saga itself is not using FileSavedToMsg in its state machine and does not have an Event<FileSavedToMsg>.

If this is the reason why...should I be passing the correlationId along in a field other than the CorrelationId, so the saga doesn't see it? I need it somewhere so I can tag SomeOtherMsg with it.

Here is how the saga endpoint is defined

return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });              

            cfg.ReceiveEndpoint(host, "study_saga", epCfg =>
            {
                epCfg.StateMachineSaga(machine, repository);
            });
});

Here is how the worker endpoint is defined

return Bus.Factory.CreateUsingRabbitMq(x =>
{
    var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        x.ReceiveEndpoint(host, "study_3d_volume_worker", c =>
        {
            c.PrefetchCount = 1;
            c.Instance(_studyCreatedMsgConsumer);
        });
 });

These are running on the same machine, but in seperate Console/Topshelf applications.

phil
  • 618
  • 2
  • 5
  • 17
  • 1
    Are your saga and your consumer on the same queue? They should be on separate receive endpoints with separate queues. It's likely you have a binding on the saga queue from the `FileSavedToMsg` exchane that is causing it. – Chris Patterson Aug 27 '16 at 04:42
  • @ChrisPatterson I have added the endpoint configuration for both the saga and the "worker". – phil Aug 27 '16 at 18:25
  • 1
    Can you look at the bindings of the `study_saga` exchange on RabbitMQ and see what exchanges are inbound to it? – Chris Patterson Aug 29 '16 at 22:30
  • 1
    @ChrisPatterson Your last comment led to the fix. What happened is that the exchange was created initially with FileSavedToMsg, but after some changes the saga was changed to use FileReadyMsg. The event was removed from the saga and I didn't realize it persisted on the exchange. I reused the FileSavedToMsg on another queue/exchange. Effectively it screwed up the routing and created the skipped queue. If you move your previous comment to an answer I will mark it. Thanks for the help. – phil Aug 30 '16 at 19:41
  • Hopefully helpful to someone in the future. I used the RabbitMQ web browser based plugin to examine and subsequently delete the offending exchange binding. Without it, I would have had a much harder time realizing that the exchange was still bound to a message type that I wanted removed. – phil Aug 30 '16 at 19:43

1 Answers1

20

If you are getting messages on a queue that are not consumed by a consumer on that receive endpoint, it might be that you either previously were consuming that message type and removed it from the consumer (or saga, in your case) or you were using the queue from some other purpose and it consumed that message type.

Either way, if you go into the RabbitMQ management console and look for the queue, you can expand the Bindings chevron, click to go to the exchange of the same name (that's a standard MassTransit convention), and then expand the bindings of the exchange to see which message types (the exchanges named like .NET type names) are bound to that exchange.

If you see one that is not consumed by the endpoint, that's the culprit. You can Unbind it using the UI, after which messages published will no longer be sent to the queue.

Chris Patterson
  • 28,659
  • 3
  • 47
  • 59
  • 1
    What if I have multiple consumers in a Request/Response scenario and the Request is doing a fanout to all consumers? Each consumer is responding to the same Request, only the first response works and all the other responses (or messages) are stored in the _skipped queue. How can I avoid that _skipped queues are created if a Consume is responding to a Request which already got a response? – Ozkan Mar 02 '17 at 13:06