3

I have a scenario with dozens of producer and one single consumer. Timing is critical: for performance reason I want to avoid any locking of producers and I want the consumer to wait as little as possible when no messages are ready.

I've started using a ConcurrentLinkedQueue, but I don't like to call sleep on the consumer when queue.poll() == null because I could waste precious milliseconds, and I don't want to use yield because I end up wasting cpu.

So I came to implement a sort of ConcurrentBlockingQueue so that the consumer can run something like:

T item = queue.poll();
if(item == null) {
    wait();
    item = queue.poll();
}
return item;

And producer something like:

queue.offer(item);
notify();

Unfortunately wait/notify only works on synchronized block, which in turn would drastically reduce producer performance. Is there any other implementation of wait/notify mechanism that does not require synchronization?

I am aware of the risks related to not having wait and notify synchronized, and I managed to resolve them by having an external thread running the following:

while(true) {
    notify();
    sleep(100);
}
Jack
  • 1,488
  • 11
  • 21
  • What's wrong with an actual BlockingQueue? – shmosel Feb 15 '18 at 09:42
  • Probably relevant https://stackoverflow.com/questions/976940/which-java-blocking-queue-is-most-efficient-for-single-producer-single-consumer – Kayaman Feb 15 '18 at 09:44
  • Well, you could implement your own subclass of `ConcurrentBlockingQueue` and register observers on it - or use something like RxJava. – Thomas Feb 15 '18 at 09:45
  • I think the title is a bit misleading though. There is no `wait/notify` without `synchronized`, and that's not really what you (the OP) want. You want low latency. Is this a trading platform? – Kayaman Feb 15 '18 at 09:47

2 Answers2

3

I've started using a ConcurrentLinkedQueue, but I don't like to call sleep on the consumer when queue.poll() == null

You should check the BlockingQueue interface, which has a take method that blocks until an item becomes available.

It has several implementations as detailed in the javadoc, but ConcurrentLinkedQueue is not one of them:

All Known Implementing Classes:
ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue

assylias
  • 321,522
  • 82
  • 660
  • 783
  • Are you aware of some of this implementation which does not lock the producer on insertion? – Jack Feb 15 '18 at 10:32
  • @Jack You can use the `offer` method which doesn't block and returns true/false to indicate if the item was inserted. If you want the call to always return true, just pick an unbounded queue, such as a `LinkedBlockingQueue` (with no capacity limit). – assylias Feb 15 '18 at 10:40
  • You mean that it doesn't block the caller, but actually I said "lock", and ArrayBlockingQueue.offer() does lock. I haven't checked other implementation, that's why I was asking. – Jack Feb 15 '18 at 10:44
  • @Jack not sure what you mean by lock: offer returns immediately. Check the javadoc of BlockingQueue which has a nice table explaining how the different methods work. The only situation where offer returns false is if the queue is full - if you don't want that to happen, use an unbounded queue. – assylias Feb 15 '18 at 10:50
  • As far as I know `offer()` uses a lock, meaning it can block the producer to wait the free way to insert value into the queue. Doesn't matter if this behavior is transparent for the caller as you are properly pointing out. But as I stated "I want to avoid any locking of producers", that's why all BlockingQueue interface doens't work for me. – Jack Feb 15 '18 at 10:59
  • @Jack I'm not sure what you are referring to: as far as I know BlockingQueue implementations are lock free and offer doesn't use any lock... – assylias Feb 15 '18 at 11:07
  • Lol.. lock-free and blocking cannot really stay on the same sentence. I'm guessing you don't know what you are talking. [Here's for your information](http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/concurrent/ArrayBlockingQueue.java#ArrayBlockingQueue.offer%28java.lang.Object%29) – Jack Feb 15 '18 at 11:14
  • @Jack You're right and I got mixed up - I think we can delete this whole conversation indeed. Sorry about that. – assylias Feb 15 '18 at 11:27
  • This may be relevant: https://stackoverflow.com/questions/1426754/linkedblockingqueue-vs-concurrentlinkedqueue – assylias Feb 15 '18 at 11:28
  • For some reason I thought the BlockingQueues were using the same lock-free systems as CountdownLatch for example - obviously not the case. – assylias Feb 15 '18 at 11:36
  • @Jack don't be mean, people are trying to help you. – Kayaman Feb 15 '18 at 12:23
  • I didn't mean to result arrogant, sorry about that. I found citation to CountdownLatch useful as it opened me to how [CAS](https://en.wikipedia.org/wiki/Compare-and-swap) is used (almost everywhere) in java concurrency, making locking much more painless than I thought – Jack Feb 15 '18 at 14:25
0

I came out with the following implementation:

private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private final Semaphore semaphore = new Semaphore(0);
private int size;

public void offer(T item) {
    size += 1;
    queue.offer(item);
    semaphore.release();
}

public T poll(long timeout, TimeUnit unit) {
    semaphore.drainPermits();
    T item = queue.poll();
    if (item == null) {
        try {
            semaphore.tryAcquire(timeout, unit);
        } catch (InterruptedException ex) {
        }
        item = queue.poll();
    }
    if (item == null) {
        size = 0;
    } else {
        size = Math.max(0, size - 1);
    }
    return item;
}

/** An inaccurate representation O(1)-access of queue size. */
public int size() {
    return size;
}

With the following properties:

  • producers never go to SLEEP state (which I think can go with BlockingQueue implementations that use Lock in offer(), or with synchronized blocks using wait/notify)
  • consumer only goes to SLEEP state when queue is empty but it is soon woken up whenever a producer offer an item (no fixed-time sleep, no yield)
  • consumer can be sometime woken up even with empty queue, but it's ok here to waste some cpu cycle

Is there any equivalent implementation in jdk that I'm not aware of? Open for criticism.

Jack
  • 1,488
  • 11
  • 21