1

I would like to know what would be the best mechanism to implement multiple Producer - single Consumer scenario, where i have to keep the current number of unprocessed requests up to date.

My first thought was to use ConcurrentLinkedQueue:

public class SomeQueueAbstraction {

    private Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>();
    private int size;

    public void add(Object request) {
        SomeObject object = convertIncomingRequest(request);   
        concurrentQueue.add(object);
        size++;

    }

    public SomeObject getHead() {
        SomeObject object = concurrentQueue.poll();
        size--;
    }

    // other methods

Problem with this is that i have to explicitly synchronize on add and size ++, as well as on the poll and size--, to have always accurate size which makes ConccurentLinkedQueue pointless to begin with.

What would be the best way to achieve as good as possible performance while maintaining data consistency ?

Should I use ArrayDequeue instead and explicitly synchronize or there is a better way to achieve this ?

There is sort of similar question/answer here:

java.util.ConcurrentLinkedQueue

where it is discussed how composite operations on ConcurrentLinkedQueue are naturally not atomic but there is no direct answer what is the best option for the given scenario.

Note: I am calculating size explicitly because time complexity for inherent .size() method is O(n).

Note2: I am also worried that getSize() method, which i haven't explicitly written, will add to even more contention overhead. It could be called relatively frequently.

I am looking for the most efficient way to handle Multiple Producers - single Consumer with frequent getSize() calls.

Alternative suggestion: If there was elementId in SomeObject structure, i could get current size from ConcurrentLinkedQueue.poll() and only locking would have to be done within mechanism to generate such id. Add and get could now properly be used without additional locking. How would this fare as an alternative ?

Community
  • 1
  • 1
John
  • 5,189
  • 2
  • 38
  • 62
  • 2
    Does the size() method always have to be _exactly_ correct? (Hint: What happens if thread A calls size(), and gets back an exactly correct answer, but then while thread A is making some decision based on the size, some other thread B sneaks in and changes the size?) – Solomon Slow Mar 23 '15 at 18:51
  • Since you are trying to keep the number of unprocessed requests low, why do you care if size is O(n)? How often do you need to know the size? – NamshubWriter Mar 23 '15 at 20:29

2 Answers2

1

You can use an explicit lock, which means you probably won't need a concurrent queue.

public class SomeQueueAbstraction {

    private Queue<SomeObject> queue = new LinkedList<>();
    private volatile int size;
    private Object lock = new Object();

    public void add(Object request) {
        SomeObject object = convertIncomingRequest(request); 
        synchronized(lock) {  
            queue.add(object);
            size++;
        }
    }

    public SomeObject getHead() {
        SomeObject object = null;
        synchronized(lock) {
            object = queue.poll();
            size--;
        }
        return object;
    }

    public int getSize() {
        synchronized(lock) {
            return size;
        }
    }

    // other methods
}

This way, adding/removing elements to/from the queue and updating the size will be done safely.

jadhachem
  • 1,123
  • 2
  • 11
  • 19
  • This is sort of the same thing i have also proposed as an alternative, only with ArrayDequeue instead of LinkedList. Is there a benefit of using the LinkedList to ArrayDeque, which should be faster? What worries me is high contention with this, especially considering how getSize could be called frequently. – John Mar 23 '15 at 18:35
  • My bad, that part slipped my attention. The thing is, you want to avoid a situation where Thread A adds to the queue, Thread B reads the `size`, and THEN Thread A updates the `size`. The avoids such situations. Also, I think it is better to make `size` a `volatile` variable. (see my new edit) – jadhachem Mar 23 '15 at 18:40
  • If you make size volatile, getSize does not strictly speaking have to be synchronized. As is, having it volatile is redundant. – Necreaux Mar 23 '15 at 18:45
  • Would volatile make it safe againt race condition too ? I belive it covers visibility issues but getSize could still be called while there is an element being added. – John Mar 23 '15 at 19:31
  • Making `size` volatile would ensure that the readers will see the most recently updated value even if the readers are not in a synchronized block (but the value that was read could be out of date a microsecond later). You still need to update size in a synchronized block because you are doing a read-modify-write – NamshubWriter Mar 23 '15 at 20:26
1

So the requirement is to report an up to date current number of unprocessed requests. And this is requested often which indeed makes ConcurrentLinkedQueue.size() unsuitable.

This can be done using an AtomicInteger: it is fast and is always as close to the current number of unprocessed requests as possible.

Here is an example, note some small updates to ensure that the reported size is accurate:

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class SomeQueueAbstraction {

    private final Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>();
    private final AtomicInteger size = new AtomicInteger();

    public boolean add(Object request) {

        SomeObject object = convertIncomingRequest(request);   
        if (concurrentQueue.add(object)) {
            size.incrementAndGet();
            return true;
        }
        return false;
    }

    public SomeObject remove() {

        SomeObject object = concurrentQueue.poll();
        if (object != null) {
            size.decrementAndGet();
        }
        return object;
    }

    public int getSize() { return size.get(); }

    private SomeObject convertIncomingRequest(Object request) { 
        return new SomeObject(getSize()); 
    }

    class SomeObject {
        int id;
        SomeObject(int id) { this.id = id; }
    }
}
vanOekel
  • 6,358
  • 1
  • 21
  • 56
  • So, this would be very fast due to Atomic implementantion but there would be slight chance to not have absolutetly accurate value at every given moment ? This might even be good enough for reporting – John Mar 23 '15 at 20:37
  • @user3360241 Yes. Imagine 1 element in the queue and 2 threads adding an element and 1 thread removing an element _at the same time_. For a short moment the size is anywhere between 0 and 3. But the atomic part of the integer will guarantee that in this short moment the reported size is never less than 0 and never more than 3. After the threads are done the reported size is 2. – vanOekel Mar 23 '15 at 22:01