JMS queue should be treated as FIFO queue in general.
The reasons for ordering being spoiled, according to IBM MQ documentation are:
- Multiple destinations
- Multiple producers
- Multiple consumers
- Publish and subscribe (imply multiple instances of a subscription)
Similar statements for ActiveMQ
ActiveMQ will preserve the order of messages sent by a single producer to all consumers on a topic. If there is a single consumer on a queue then the order of messages sent by a single producer will be preserved as well.
If you have multiple consumers on a single queue the consumers will compete for messages and ActiveMQ will load balance across them, so order will be lost.
You need to process messages of the same group by the same thread (sequentially) not to reorder them. Kafka provide you smart partitioning of the message based on message key. ActiveMQ has notion of message groups which utilizes message header.
Consider partitioning example in consumer application using java fair locks if you can't use the above. Message reading from the queue and partition derivation should be synchronous, actual processing could be parallelized.
String message;
String messageKey;
ReentrantLock messageKeyLock;
partitioningSupport.getFairLock().lock();
try {
// use DUPS_OK_ACKNOWLEDGE with deduplication service which improve performance of sequential read
message = (String) jmsTemplate.receiveAndConvert(QUEUE);
if (message == null || deduplicationService.deduplicate(md5(message)))
continue;
messageKey = findByXPath(path, message)
messageKeyLock = partitioningSupport.getPartitionLock(messageKey);
} finally {
partitioningSupport.getFairLock().unlock();
}
messageKeyLock.lock();
try {
// parallel message processing
} finally {
messageKeyLock.unlock();
}
With 10 key diversity (number of unique keys), 10 consumer threads and 255 partitions, locking is notable

With 1000 key diversity and others being the same, locking is rather casual and not notable (probability of waiting is relatively small).

Implementation
import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.apache.commons.lang3.StringUtils.isBlank;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
public class PartitioningSupport {
private final ConcurrentHashMap<Integer, ReentrantLock> locks = new ConcurrentHashMap<>();
private final ReentrantLock fairLock = new ReentrantLock(true);
private final int diversity;
public PartitioningSupport() {
this(0xff);
}
public PartitioningSupport(int diversity) {
this.diversity = diversity;
}
public ReentrantLock getPartitionLock(String messageKey) {
fairLock.lock();
try {
int partition = partition(messageKey);
ReentrantLock lock = locks.get(partition);
if (lock == null) {
lock = new ReentrantLock(true);
locks.put(partition, lock);
}
return lock;
} finally {
fairLock.unlock();
}
}
private int partition(String key) {
return (isBlank(key) ? nextInt() : key.hashCode()) & diversity;
}
public ReentrantLock getFairLock() {
return fairLock;
}
}
Test
import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.lang.System.out;
import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.apache.commons.lang3.RandomUtils.nextLong;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.junit.jupiter.api.Test;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class PartitioningSupportTest {
private BlockingQueue<String> queue = new LinkedBlockingDeque<>();
private List<Future<?>> results = new ArrayList<>();
private ExecutorService consumers = newFixedThreadPool(10, new ThreadFactoryBuilder().setNameFormat("consumer-%s").build());
private PartitioningSupport partitioningSupport = new PartitioningSupport();
private volatile ConcurrentHashMap<String, AtomicInteger> ids;
private int repeatTest = 10;
private int uniqueKeysCount = 1; // 100
private int totalMessagesCount = 1000;
@Test
public void testProcessingOrder() throws InterruptedException, ExecutionException {
for (int testIter = 0; testIter < repeatTest; testIter++) {
ids = new ConcurrentHashMap<>();
results = new ArrayList<>();
for (int messageIter = 1; messageIter <= totalMessagesCount; messageIter++) {
String messageKey = "message-" + nextInt(0, uniqueKeysCount);
ids.putIfAbsent(messageKey, new AtomicInteger());
queue.put(format("%s.%s", messageKey, messageIter));
}
for (int i = 0; i < totalMessagesCount; i++)
results.add(consumers.submit(this::consume));
for (Future<?> result : results)
result.get();
}
consumers.shutdown();
}
private void consume() {
try {
String message;
String messageKey;
ReentrantLock messageKeyLock;
partitioningSupport.getFairLock().lock();
try {
message = queue.take();
messageKey = message.substring(0, message.indexOf('.'));
messageKeyLock = partitioningSupport.getPartitionLock(messageKey);
} finally {
partitioningSupport.getFairLock().unlock();
}
messageKeyLock.lock();
try {
sleep(nextLong(1, 10));
int ordinal = parseInt(message.substring(message.indexOf('.') + 1));
int previous = ids.get(messageKey).getAndSet(ordinal);
out.printf("processed: %s - %s%n", messageKey, ordinal);
assertTrue(ordinal > previous, format("broken order %s [%s -> %s]", messageKey, previous, ordinal));
} finally {
messageKeyLock.unlock();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}