4

Is it possible to move error message from error queue to its original queue, programmatically or via UI?

Update

Questions below on the code below:

1 Does the code below apply to Publiser or Subscriber or both?

The code below:

 Configure.With(activator)
    .Transport(t => (...)) //< use queue "error" here
    .Routing(r =>
    {
        r.AddTransportMessageForwarder(async transportMessage =>
        {
            var sourceQueue = transportMessage.Headers.TryGetValue(Headers.SourceQueue, out var result)
                ? result
                : throw new ArgumentException($"Could not find '{Headers.SourceQueue}' header");

            return ForwardAction.ForwardTo(sourceQueue);
        });
    })
    .Start();

2 Transport method below works for my code. However, the code above suggests using error queue name, will it work?

Where are the Publiser and Subscriber queue name like below specified if the code above is used?

Please provide code for pub sub pattern.

Publisher:

.Transport(t => t.UseAzureServiceBus(Consts.ServiceBusConnectionString, Consts.Publisher))

Subscriber:

.Transport(t=>t.UseAzureServiceBus(Consts.ServiceBusConnectionString, Consts.Subscriber1))

https://github.com/rebus-org/Rebus/wiki/Transport-message-forwarding

Pingpong
  • 7,681
  • 21
  • 83
  • 209

1 Answers1

4

Since Rebus uses ordinary queues as its dead-letter queues, it's quite easy to start a bus instance with error as the input queue – then you can e.g. use Rebus' built-in transport message forwarding capability to do that you want to the messages – e.g. forward them to their source queues:

Configure.With(activator)
    .Transport(t => (...)) //< use queue "error" here
    .Routing(r =>
    {
        r.AddTransportMessageForwarder(async transportMessage =>
        {
            var sourceQueue = transportMessage.Headers.TryGetValue(Headers.SourceQueue, out var result)
                ? result
                : throw new ArgumentException($"Could not find '{Headers.SourceQueue}' header");

            return ForwardAction.ForwardTo(sourceQueue);
        });
    })
    .Start();

or whatever you want in there.

There also exists a UI, Fleet Manager, that can do this – it replaces the need for dead-letter queues entirely, as it stores failed messages in its database and makes it possible to return the failed messages to their source queues (or another queue, if that's what you want), but it's only available if you're a Rebus Pro subscriber.


Update (with answers to the questions in your update):

1) AddTransportMessageForwarder is only relevant for an endpoint that receives messages.

2) It's the "queue name" specified as an argument to the .Useblablabla method. For example, with Azure Service Bus it would read

.Transport(t => t.UseAzureServiceBus(Consts.ServiceBusConnectionString, "error"))
mookid8000
  • 18,258
  • 2
  • 39
  • 63
  • Where should the line with ForwardTo("") be called? Fleet Manager uses database for deal letter, can it use ordinary queue for this purpose? – Pingpong May 23 '19 at 10:20
  • How to specify the error queue name where `ForwardTo` method read? Please provide more code. – Pingpong May 23 '19 at 10:49
  • The `ForwardTo` method does not read anything. I'm suggesting you start a bus instance with your dead-letter queue (e.g. `error` as its _input queue_ == `.Transport(t => t.UseWhatever("error"))`), and then implement a handler that forwards messages. I've updated the answer to include some code that uses Rebus' built-in ability to [forward transport messages](https://github.com/rebus-org/Rebus/wiki/Transport-message-forwarding) – mookid8000 May 23 '19 at 12:28
  • I added a question on Update section on OP. Please have a look. – Pingpong May 24 '19 at 11:35
  • Does the code `.Transport(t => t.UseAzureServiceBus(Consts.ServiceBusConnectionString, "error"))` replace `.Transport(t=>t.UseAzureServiceBus(Consts.ServiceBusConnectionString, Consts.Subscriber1))`? or can both exist with separate Configure.With(activator)? Can both exist in the same process? – Pingpong May 26 '19 at 23:56
  • You can have as many bus instances as you want in the same process. And no, it does not replace your subscriber – my idea with receiving from the `error` queue was that that would make it possible for you to write some code that routes the message back to its source queue. – mookid8000 May 27 '19 at 06:26