1

I have primitive messaging system inside application. Message can be submitted by the producer from one thread and processed by the consumer in another thread - there'are only two threads by the design: one thread for consumer and another for producer, and it's not possible to change this logic.

I'm using ConcurrentLinkedQueue<> implementation to work with messages:

// producer's code (adds the request)
this.queue.add(req);

// consumer's code (busy loop with request polling)
while (true) {
  Request req = this.queue.poll();
  if (req == null) {
    continue;
  }
  if (req.last()) {
    // last request submitted by consumer
    return;
  }
  // function to process the request
  this.process(req);
}

Processing logic is very fast, consumer may receive about X_000_000 requests per second.

But I've discovered using profiler that queue.poll() sometimes is very slow (it seems when queue is receiving a lot of new items from producer) - it's about 10x times slower when receiving a lot of new messages comparing to already filled up queue without adding new items from another thread.

Is it possible to optimize it? What is the best Queue<> implementation for this particular case (one thread for poll() and one thread for add())? Maybe it would be easier to implement some simple queue by-self?

Kirill
  • 7,580
  • 6
  • 44
  • 95
  • 1
    Does this answer your question? [LinkedBlockingQueue vs ConcurrentLinkedQueue](https://stackoverflow.com/questions/1426754/linkedblockingqueue-vs-concurrentlinkedqueue) – Amongalen Jul 20 '20 at 12:25
  • @Amongalen thanks for the link, but I don't think so - `LinkedBlockingQueue` is worse for performance in my case: I tried to use it, but it was more than 2x times slower than non-blocking `ConcurrentQueue`. So both queue implementations are not suitable for my application. – Kirill Jul 20 '20 at 13:01
  • Have you tried SynchronousQueue? This class is used when one thread wants to hand off data to another thread. – Eric Jul 21 '20 at 08:25
  • @Eric And how will the producer be able to add multiple elements to the queue to store them while the comsumer is busy working on one? – akuzminykh Jul 21 '20 at 08:35
  • For what it's worth I've successfully used a circular buffer for this very scenario in the past with relatively little effort. If I recall correctly, all that was needed was a (wrapping) write index, a (wrapping) read index, and a monitor variable for the writer to kick-start the waiting reader. The reader never updates the write index, the writer never updates the read index. -- As I recall, I was using a pre-allocated, fixed-length array. A dynamically resizing queue would likely have been more complicated to do. – 500 - Internal Server Error Jul 21 '20 at 09:41

1 Answers1

3

The consumer is slower while the producer is producing because each time it reads, it experiences a cache miss, since a new element will always be present. If all elements are already present, they can be fetched together, which improves throughput.

When busy-waiting consider using Thread.onSpinWait(): while it adds latency, it also enables certain performance optimizations.

// consumer's code (busy loop with request polling)
while (true) {
  Request req = this.queue.poll();
  if (req == null) {
    Thread.onSpinWait();
    continue;
  }
  if (req.last()) {
    // last request submitted by consumer
    return;
  }
  // function to process the request
  this.process(req);
}

The JDK does not have queues optimized for SPSC (Single-Producer Single-Consumer) scenarios. There are libraries for that. You can use Agrona or JCTools. Implementing these is not easy.

// Agrona
Queue<Request> queue = new OneToOneConcurrentArrayQueue<>(2048);
// JCTools
Queue<Request> queue = new SpscArrayQueue<>(2048);
spongebob
  • 8,370
  • 15
  • 50
  • 83
  • Thanks, I tested all proposed solutions and choose Jctools implementation of `SpscArrayQueue`. It shows the best performance in my application. – Kirill Oct 26 '20 at 09:38