Not sure if I'm just being dense, but I am trying to configure spring (without XML configuration) to connect to RabbitMQ. I can get it to work just fine by creating a single SmartMessageListenerContainer to consume messages from my queues but where I'm stumbling is how to create a group of containers to listen on the same queue. My understanding is that you get one container with one message listener per queue but that you can configure multiple containers to listen to the same queue. My objective is to implement the worker queue strategy that RabbitMQ docs outline (one queue, multiple workers).
What I'm attempting to achieve is one message listener/worker per message delivery (prefetch = 1) and a pool of message containers bound to a single queue to effectively create my pool of workers. Ultimately this all needs to be dynamic, because each instance of my service needs to be tuned appropriately to the expected amount of work to be done. Some instances may only need a couple of workers and others may need many. This is why declaring a bean for each container doesn't really work in my case because I don't know ahead of time how many I'll need. Plus it seems funky to have to do something like (which is what most of the examples that I've seen do):
@Bean
SimpleMessageListenerContainer container1() {...}
@Bean
SimpleMessageListenerContainer container2() {...}
@Bean
SimpleMessageListenerContainer container3() {...}
Below is my current code, I register a bean that returns a collection of containers. This method automatically gets called and creates all the containers as expected, but the containers never seem to auto start (which I'm pretty sure I understand why since I'm creating the containers manually). I've attempted to manually call the start() method on each container after creation, but there is a 60 second delay on each container when this little piece of code is reached:
AsyncMessageProcessingConsumer
private FatalListenerStartupException getStartupException() throws TimeoutException, InterruptedException {
this.start.await(60000L, TimeUnit.MILLISECONDS);
return this.startupException;
}
I did however see one post that mentioned needing to implement the SmartLifeCycle interface. I could certainly do this on my configuration class below, and then call the start() method on each container I've created when my start() method is called. Is this my only option or am I just completely missing something here?
My Rabbit Configuration Class:
@Configuration
public class RabbitConfiguration {
private final List<SimpleMessageListenerContainer> listenerContainers;
private final boolean useDurableQueues;
private final String exchangeName;
private final String inboundQueueName;
private final String outboundQueueName;
private final String host;
private final String virtualHost;
private final String userName;
private final String password;
private final int maxWorkers;
public RabbitConfiguration(
@Value( "${com.unwiredrev.remotelink.rabbitmq.exchangeName}" )
final String exchangeName,
@Value( "${com.unwiredrev.remotelink.rabbitmq.inboundQueueName}" )
final String inboundQueueName,
@Value( "${com.unwiredrev.remotelink.rabbitmq.outboundQueueName}" )
final String outboundQueueName,
@Value( "${com.unwiredrev.remotelink.rabbitmq.priorityQueueName}" )
final String priorityQueueName,
@Value( "${com.unwiredrev.remotelink.rabbitmq.username}" )
final String userName,
@Value( "${com.unwiredrev.remotelink.rabbitmq.password}" )
final String password,
@Value( "${com.unwiredrev.remotelink.rabbitmq.host}" )
final String host,
@Value( "${com.unwiredrev.remotelink.rabbitmq.virtualHost}" )
final String virtualHost,
@Value( "${com.unwiredrev.remotelink.rabbitmq.useDurableQueues:true}" )
final boolean useDurableQueues,
@Value( "${com.unwiredrev.remotelink.rabbitmq.maxWorkers:1}")
final int maxWorkers ) {
this.exchangeName = exchangeName;
this.inboundQueueName = inboundQueueName;
this.outboundQueueName = outboundQueueName;
this.useDurableQueues = useDurableQueues;
this.host = host;
this.virtualHost = virtualHost;
this.userName = userName;
this.password = password;
this.maxWorkers = maxWorkers;
this.listenerContainers = new ArrayList<>( maxWorkers );
}
public String getExchangeName() {
return exchangeName;
}
public String getInboundQueueName() {
return inboundQueueName;
}
public String getOutboundQueueName() {
return outboundQueueName;
}
public boolean useDurableQueues() {
return useDurableQueues;
}
public String getHost() {
return host;
}
public Optional<String> getVirtualHost() {
return Optional.ofNullable( StringUtils.trimToNull( this.virtualHost ) );
}
public String getUserName() {
return userName;
}
public String getPassword() {
return password;
}
public int getMaxWorkers() {
return maxWorkers;
}
@Bean
Collection<SimpleMessageListenerContainer> messageListenerContainers( @NotNull ConnectionFactory connectionFactory, @NotNull RabbitAdmin rabbitAdmin ) {
IntStream.range( 0, getMaxWorkers() ).forEach( idx -> {
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setQueueNames( inboundQueueName );
container.setConnectionFactory( connectionFactory );
container.setMessageListener( new MessageReceiver() );
container.setAcknowledgeMode( AcknowledgeMode.MANUAL );
container.setRabbitAdmin( rabbitAdmin );
listenerContainers.add( container );
} );
return listenerContainers;
}
@Bean
RabbitAdmin rabbitAdmin( @NotNull ConnectionFactory connectionFactory ) {
return new RabbitAdmin( connectionFactory );
}
@Bean
ConnectionFactory connectionFactory() {
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory( getHost() );
connectionFactory.setUsername( getUserName() );
connectionFactory.setPassword( getPassword() );
getVirtualHost().ifPresent( connectionFactory::setVirtualHost );
return connectionFactory;
}
@Bean( name = "rlClientExchange")
TopicExchange getExchange() {
return new TopicExchange( getExchangeName(), true, false );
}
@Bean( name = "rlClientInboundQueue" )
Queue inboundQueue() {
return new Queue( getInboundQueueName(), useDurableQueues() );
}
@Bean
Queue outboundQueue() {
return new Queue( getOutboundQueueName(), useDurableQueues() );
}
@Bean
Binding inboundQueueBinding( @NotNull final TopicExchange exchange ) {
if( exchange == null ) {
throw new IllegalArgumentException( "Rabbit topic exchange cannot be null." );
}
return BindingBuilder.bind( inboundQueue() )
.to( exchange )
.with( getInboundQueueName() );
}
@Bean
Binding outboundQueueBinding( @NotNull final TopicExchange exchange ) {
if( exchange == null ) {
throw new IllegalArgumentException( "Rabbit topic exchange cannot be null." );
}
return BindingBuilder.bind( outboundQueue() )
.to( exchange )
.with( getOutboundQueueName() );
}
}