With a RabbitMQ Direct Exchange and Spring AMQP you can implement a queuing topology that holds a queue for each account connected to a single exchange. Sending messages to the exchange with the account name as a routing key and having a single consumer bound to multiple queues, the consumer will receive the messages round robin (see "Direct exchanges and load balance").
The problem with this setup is, that you might end up with quite a few queues (one for each account) and at least in my implementation (attached as simple Spring Boot application below), you will have to "restart" the consumer each time a new account comes in, as this means that you have a new queue to attach the consumer to. Don't know, whether this scales / performs very well. Check this post for the maximum number of queues in RabbitMQ and if this might affect you.
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RoundRobin.RoundRobinQueueConfiguration.class)
public class RoundRobin {
private static final String EXCHANGE = "round-robin-exchange";
private final List<String> tasks = Arrays.asList( // account(a):task(t) where t holds the expected order of consumption
"a1:t1", "a2:t2", "a3:t3", // make sure, a queue for every account (a) exists
"a1:t4", "a1:t7", "a1:t9", "a1:t10", // add "many" tasks (t) for account 1
"a2:t5", "a2:t8", "a3:t6"); // add further tasks for other accounts, such that a1 has to "wait"
private final List<String> declaredQueues = new ArrayList<>();
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private DirectExchange directExchange;
@Autowired
private SimpleMessageListenerContainer listenerContainer;
@Test
public void enqueuedTasksAreProcessedRoundRobin() {
tasks.forEach(task -> {
String[] accountAndTask = task.split(":");
declareQueue(accountAndTask[0]);
rabbitTemplate.convertAndSend(accountAndTask[0], accountAndTask[1] + " from account " + accountAndTask[0]);
});
}
private void declareQueue(String routingKey) {
if (!declaredQueues.contains(routingKey)) {
Queue queue = new Queue(routingKey);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(routingKey));
listenerContainer.stop();
listenerContainer.addQueues(queue);
listenerContainer.start();
declaredQueues.add(routingKey);
}
}
@Configuration
public static class RoundRobinQueueConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange(EXCHANGE);
return template;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public DirectExchange directExchange(RabbitAdmin rabbitAdmin) {
DirectExchange directExchange = new DirectExchange(EXCHANGE);
rabbitAdmin.declareExchange(directExchange);
return directExchange;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin) {
Queue queue = new Queue("dummy-queue"); // we need a queue to get the container started...
rabbitAdmin.declareQueue(queue);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(new RoundRobinMessageListener());
container.setQueues(new Queue("dummy-queue"));
container.start();
return container;
}
}
public static class RoundRobinMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("Consumed message " + (new String(message.getBody())));
}
}
}
The number of the tasks is arbitrary in this example - but I wanted to "add the expected" order to see whether the output matches our expectations.
The output of the test is:
Consumed message t1 from account a1
Consumed message t2 from account a2
Consumed message t3 from account a3
Consumed message t4 from account a1
Consumed message t5 from account a2
Consumed message t6 from account a3
Consumed message t7 from account a1
Consumed message t8 from account a2
Consumed message t9 from account a1
Consumed message t10 from account a1
which I guess is what you wanted...