5

This qeuestion is on consuming the messages using AMQP in .Net. The documentation recommends amqpnetlite: https://access.redhat.com/documentation/en-us/red_hat_amq/7.0/html-single/using_the_amq_.net_client/index

On subscribing to an address using AMQPNetLite, the address and the queue will be auto-created. The auto-created queue is always "unicast" though. I have not been able to auto-create

  1. a multicast queue
  2. that allowed any number of consumers.

Code:

private async Task RenewSession()
{
    Connect = await Connection.Factory.CreateAsync(new Address("amqp://admin:admin@localhost:5672"), new Open() {ContainerId = "client-1"});
    MqSession = new Session(Connect);
    var receiver = new ReceiverLink(MqSession, DEFAULT_SUBSCRIPTION_NAME, GetSource("test-topic"), null);
    receiver.Start(100, OnMessage);
}

private Source GetSource(string address)
{
    var source = new Source
    {
        Address = address,
        ExpiryPolicy = new Symbol("never"),
        Durable = 2,
        DefaultOutcome = new Modified
        {
            DeliveryFailed = true,
            UndeliverableHere = false
        }
    };
    return source;
}

Maybe I am missing some flags?

Praveen Nayak
  • 167
  • 1
  • 16
  • Got the solution to the first question - creating multicast queue is using source capabilities: Capabilities = new[] { new Symbol("topic") }, // or "queue" to ask for queue. – Praveen Nayak Jul 30 '18 at 11:37

1 Answers1

5

in AMQP, you choose between autocreating a queue (anycast routing) or a topic (multicast routing) by setting a capability.

The capability should be either new Symbol("queue") or new Symbol("topic").

public class SimpleAmqpTest
{
    [Fact]
    public async Task TestHelloWorld()
    {
        Address address = new Address("amqp://guest:guest@localhost:5672");
        Connection connection = await Connection.Factory.CreateAsync(address);
        Session session = new Session(connection);

        Message message = new Message("Hello AMQP");

        Target target = new Target
        {
            Address = "q1",
            Capabilities = new Symbol[] { new Symbol("queue") }
        };

        SenderLink sender = new SenderLink(session, "sender-link", target, null);
        await sender.SendAsync(message);

        Source source = new Source
        {
            Address = "q1",
            Capabilities = new Symbol[] { new Symbol("queue") }
        };

        ReceiverLink receiver = new ReceiverLink(session, "receiver-link", source, null);
        message = await receiver.ReceiveAsync();
        receiver.Accept(message);

        await sender.CloseAsync();
        await receiver.CloseAsync();
        await session.CloseAsync();
        await connection.CloseAsync();
    }
}

Have a look at https://github.com/Azure/amqpnetlite/issues/286, where the code comes from.

You can choose whether the default routing will be multicast or anycast by setting default-address-routing-type in broker.xml, everything documented at https://activemq.apache.org/artemis/docs/2.6.0/address-model.html

The broker's multicastPrefix and anycastPrefix feature is not implemented for AMQP. https://issues.jboss.org/browse/ENTMQBR-795

user7610
  • 25,267
  • 15
  • 124
  • 150
  • I figured out this same detail just a while back, added as a comment to the question. Would you know the second part of the question - how to create the multicast queue to allow max-consumers of -1 (any number). Note: the address allows multicast queue, yes, but the max consumers on one multicast queue is set to 1 – Praveen Nayak Jul 30 '18 at 15:26
  • @PraveenNayak Not really. AFAIK you cannot autocreate it that way, maybe you can set it up in broker.xml, though, or through management. What I would ask is _why_ do you want to set it. – user7610 Jul 30 '18 at 15:56
  • The idea with multicast routing is that you have one address, and each consumer gets its own subscription queue under that address, where the messages for it are delivered. So even though there is only one consumer in each subscription queue, there is a separate subscription queue for each consumer, and therefore there is multiple consumers on the address, no problem. – user7610 Jul 30 '18 at 15:58
  • the need is to be able to use the broker for pub-sub. The publisher publishes an "OrderPlaced" to a multicast address. Two subscribers "InventoryMgmt" and "Billing" subscribe to it in 1 queue each. We would like to distribute the "Billing" workload by adding 3 instances of "Billing" consumer, while "InventoryMgmt" has 1 consumer. Adding 3 "Billing" consumers in 3 queues will cause all 3 consumers get a copy of the message each. All 3 consumers working off the "Billing" queue will ensure the work load is distributed among the three, maybe round robin. I hope this makes sense. – Praveen Nayak Jul 30 '18 at 17:31
  • Had a detailed question here: https://stackoverflow.com/questions/51559207/amq-address-with-multiple-clients-to-a-multicast-queue#51582014, just to know that such a setting was indeed possible in Rehat AMQ – Praveen Nayak Jul 30 '18 at 18:28
  • 1
    @PraveenNayak I think I understand. Using the JMS terminology, it sounds exactly like the "shared subscription" feature, http://jmesnil.net/weblog/2013/06/27/jms-20-shared-subscription/. You can achieve that by setting the `"shared"` (and possibly also `"global"`) capabilities. Also, set name of the subscription in receiver options, e.g. `opts.name("sub-1"); // A stable link name` (that would be in qpid-proton-cpp, according to example linked from first comment at https://issues.jboss.org/browse/ENTMQCL-580). Messages will get distributed among receivers that all set the same name. – user7610 Jul 30 '18 at 19:53
  • setting the "shared" capability did it for me, thanks for the help – Praveen Nayak Jul 30 '18 at 20:26