2

I want have a Producer Consumer Problem where only the newest Item shall be consumed. This problem may have a different name, but I couldn't figure it out!

The producer thread(s) produce elements in a non-blocking fashion by overriting any old items. The single consumer thread should wait for an element to be created and consume it.

I thought about using a blocking queue but the java implementation does not allow for overriding old elements. A circular buffer (like from the commons libary) doesn't work either because its not blocking for the consumer.

Is there a datastructure that serves this purpose or do I need to find a better way?

It might also be possible to solve this with low level synchronization tools like locks but I couldn't figure out how to do it.

Benedikt Bünz
  • 648
  • 7
  • 22
  • elements in a non-blocking fashion by overriting any old items.. .......please clarify over this statement . do you need to have a storage where there will be only a single item in the datastructure....if you want to have a scenario where producer produces and consumer consumers in sync manner than you can use SynchronizeQueue from java.util.concurrent package – csk Aug 31 '13 at 13:13
  • He seems to be talking about a circular buffer - you overwrite the oldest. – Val Aug 31 '13 at 13:28
  • I actually just simply want to have one single element to consume. I would probably not even need a queue but just locks ... Sorry about being so unclear – Benedikt Bünz Aug 31 '13 at 13:32
  • It looks like a duplicate of http://stackoverflow.com/questions/11079210 – Val Aug 31 '13 at 13:35
  • @Val a CircularFifoBuffer doesn't have blocking removal – Benedikt Bünz Aug 31 '13 at 13:42
  • SynchronousQueue has an `offer` method, which transports the item if consumer is ready or discards the item, http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html#offer%28E%29. I think that such questions are better for IRC. – Val Aug 31 '13 at 13:50

4 Answers4

1

There is no need for a special data structure. Just use the methods available in Object. They are quite good in this situation, because the blocking consumer:

class ItemHolder<T> {
    private T item;
    public synchronized void produce(T item) {
        this.item = item;
        notify();
    }
    public synchronized T consume() {
        while (item == null) {
            wait();
        }
        T result = item;
        item = null;
        return result;
    }
}
nosid
  • 48,932
  • 13
  • 112
  • 139
  • Thread Scheduler can call `produce()` more than one. When then, you lose the other items. –  Aug 31 '13 at 13:11
0

Efficient Circular Buffer in Java

Overwriting Circular buffers are great data structures to use if you would like to operate on a recent window of data. Elements are added and removed FIFO like a Queue, except additions on full buffers will cause the oldest (head of the queue) element to be removed.

import java.util.NoSuchElementException;  
 /**  
  * Thread safe fixed size circular buffer implementation. Backed by an array.  
  *   
  * @author brad  
  */  
 public class ArrayCircularBuffer<T> {  
      // internal data storage  
      private T[] data;  
      // indices for inserting and removing from queue  
      private int front = 0;  
      private int insertLocation = 0;  
      // number of elements in queue  
      private int size = 0;  
      /**  
       * Creates a circular buffer with the specified size.  
       *   
       * @param bufferSize  
       *      - the maximum size of the buffer  
       */  
      public ArrayCircularBuffer(int bufferSize) {  
           data = (T[]) new Object[bufferSize];  
      }  
      /**  
       * Inserts an item at the end of the queue. If the queue is full, the oldest  
       * value will be removed and head of the queue will become the second oldest  
       * value.  
       *   
       * @param item  
       *      - the item to be inserted  
       */  
      public synchronized void insert(T item) {  
           data[insertLocation] = item;  
           insertLocation = (insertLocation + 1) % data.length;  
           /**  
            * If the queue is full, this means we just overwrote the front of the  
            * queue. So increment the front location.  
            */  
           if (size == data.length) {  
                front = (front + 1) % data.length;  
           } else {  
                size++;  
           }  
      }  
      /**  
       * Returns the number of elements in the buffer  
       *   
       * @return int - the number of elements inside this buffer  
       */  
      public synchronized int size() {  
           return size;  
      }  
      /**  
       * Returns the head element of the queue.  
       *   
       * @return T  
       */  
      public synchronized T removeFront() {  
           if (size == 0) {  
                throw new NoSuchElementException();  
           }  
           T retValue = data[front];  
           front = (front + 1) % data.length;  
           size--;  
           return retValue;  
      }  
      /**  
       * Returns the head of the queue but does not remove it.  
       *   
       * @return  
       */  
      public synchronized T peekFront() {  
           if (size == 0) {  
                return null;  
           } else {  
                return data[front];  
           }  
      }  
      /**  
       * Returns the last element of the queue but does not remove it.  
       *   
       * @return T - the most recently added value  
       */  
      public synchronized T peekLast() {  
           if (size == 0) {  
                return null;  
           } else {  
                int lastElement = insertLocation - 1;  
                if (lastElement < 0) {  
                     lastElement = data.length - 1;  
                }  
                return data[lastElement];  
           }  
      }  
 }  
Val
  • 1
  • 8
  • 40
  • 64
0

Here is Circular Bounded Queue which is (supposed to be)thread safe and provides a blocking take operation.

public class CircularQueue<T> {
    private final int MAX_SIZE;

    private final AtomicReferenceArray<T> buffer;
    private final AtomicInteger start;
    private final AtomicInteger end;
    private final AtomicInteger len;

    private final ReentrantLock rwlock;
    private final Condition readCondition;

    public CircularQueue(int size) {
        MAX_SIZE = size;
        buffer = new AtomicReferenceArray<T>(size);
        start = new AtomicInteger(0);
        end = new AtomicInteger(0);
        len = new AtomicInteger(0);
        rwlock = new ReentrantLock(true);
        readCondition =  rwlock.newCondition();
    }

    /** 
    * Adds to tail of the queue
    */
    public void put(T val) {
        try {
            rwlock.lock();

            buffer.set(end.get(), val);
            end.set((end.get() + 1) % MAX_SIZE);
            if (len.get() == MAX_SIZE) { // overwrite
                start.set((start.get() + 1) % MAX_SIZE);
            } else {
                len.incrementAndGet();
            }
            readCondition.signal();
        } finally {
            rwlock.unlock();
        }
    }

    /**
     * Blocking removeFront operation       
     * @return
     */
    public T take() {
        T val = null;
        try {
            rwlock.lock();
            while (len.get() == 0) {
                try {
                    readCondition.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }

            val = buffer.get(start.get());
            buffer.set(start.get(), null);
            start.set((start.get() + 1) % MAX_SIZE);
            len.decrementAndGet();
        } finally {
            rwlock.unlock();
        }

        return val;
    }

    public int size() {
        int curLen = 0;
        try {
            rwlock.lock();
            curLen = len.get();
        } finally {
            rwlock.unlock();
        }
        return curLen;
    }
}

There are many operations which are yet to be added like poll, offer etc. But you can test this out with some threads :

It is going to hang your JVM if it runs correctly.

public static void main(String[] args) {
    final int MAX_QUEUE_SIZE = 4;
    final CircularQueue<Integer> q = new CircularQueue<Integer>(MAX_QUEUE_SIZE);
    new Thread(new Runnable() {

        @Override
        public void run() {
            for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }

                System.out.println("Putting: from " + Thread.currentThread().getName() + " " +  i);
                q.put(i);
            }

            for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
                System.out.println("Trying to get from " + Thread.currentThread().getName() + " " + q.take());
            }   
        }
    }).start();

    new Thread(new Runnable() {

        @Override
        public void run() {
            for (int i = 10; i < 10 + MAX_QUEUE_SIZE; ++i) {
                try {
                    Thread.sleep(1001);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Putting: from " + Thread.currentThread().getName() + " " +  i);
                q.put(i);
            }

            for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
                System.out.println("Trying to get from " + Thread.currentThread().getName() + " " + q.take());
            }               
        }
    }).start();

}

Your output should probably match

Putting: from Thread-0 0
Putting: from Thread-1 10
Putting: from Thread-0 1
Putting: from Thread-1 11
Putting: from Thread-0 2
Putting: from Thread-1 12
Putting: from Thread-0 3
Trying to get from Thread-0 11
Trying to get from Thread-0 2
Trying to get from Thread-0 12
Trying to get from Thread-0 3
Putting: from Thread-1 13
Trying to get from Thread-1 13

The other take operations from Thread-1 are waiting for a corresponding put operation since Thread-1 is slightly slower than Thread-0.

bsd
  • 2,707
  • 1
  • 17
  • 24
0

Simplest solution that Java provides for this is this:

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor()

Per doc: "Creates an Executor that uses a single worker thread operating off an unbounded queue, and uses the provided ThreadFactory to create a new thread when needed"

Rahul.B
  • 354
  • 3
  • 18