6

I have producer and consumer connected with BlockingQueue.

Consumer wait records from queue and process it:

Record r = mQueue.take();
process(r);

I need pause this process for a while from other thread. How to implement it?

Now I think implement it such, but it's looks like bad solution:

private Object mLock = new Object();
private boolean mLocked = false;

public void lock() {
    mLocked = true;
}

public void unlock() {
    mLocked = false;
    mLock.notify();

}

public void run() {
    ....
            Record r = mQueue.take();
            if (mLocked) {
                mLock.wait();
            }
            process(r);
}
HotIceCream
  • 2,271
  • 2
  • 19
  • 27
  • 1
    Do you mind providing a bit more context on your specific need? The reason I ask, is that if I really need to pause someone in the chain, I would much rather pause the producer rather than the consumer. It basically achieves a similar effect without having the queue grow to capacity while the pause is in effect. – sstan Jul 22 '15 at 15:22

3 Answers3

2

You can use java.util.concurrent.locks.Condition Java docs to pause for a while based on same condition.

This is approach looks clean to me and ReentrantLock mechanism has better throughput than synchronized. Read below excerpt from IBM article

As a bonus, the implementation of ReentrantLock is far more scalable under contention than the current implementation of synchronized. (It is likely that there will be improvements to the contended performance of synchronized in a future version of the JVM.) This means that when many threads are all contending for the same lock, the total throughput is generally going to be better with ReentrantLock than with synchronized.


BlockingQueue are best known for solution of producer-consumer problem, and it also uses Condition for waiting.

See below example taken from Java doc's Condition, which is a sample implementation of producer - consumer pattern.

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
 }

Further reading:

hagrawal7777
  • 14,103
  • 5
  • 40
  • 70
  • 3
    Mr. down Downvoter, can you take the pain to explain your down vote please? – hagrawal7777 Jul 22 '15 at 13:50
  • Man I hate downvotes when the person actually takes some time to provide an insightful explanation. If this solution doesn't work for you, fine. But it doesn't deserve a downvote. +1 for me even if I find the first one simpler, the second offers insight on threads and concurrency. – Pat B Jul 22 '15 at 13:55
  • 2
    not my downvote, but giving a performance judgment based on an article 10+ years old seems iffy. – Nathan Hughes Jul 22 '15 at 13:59
  • @PatB Thanks pal for the stand and support. – hagrawal7777 Jul 22 '15 at 14:04
  • @NathanHughes I would accept your comment, I tried to get some latest reference but couldn't find any in my quick search. But I think this will hold true because this is the same mechanism which is used in BlockingQueue implementations like `ArrayBlockingQueue`, `LinkedBlockingQueue` – hagrawal7777 Jul 22 '15 at 14:07
  • understood, just trying to think like the downvoter here. might help to add something like this to the answer? – Nathan Hughes Jul 22 '15 at 14:10
  • @NathanHughes Based on your comment, I have added a note for blocking queues to give OP confidence of something very latest. – hagrawal7777 Jul 22 '15 at 14:11
  • I don't downvote this answer. But I do not understand how to apply ReentrantLock to my question. More precisely, I tried to do it, but ran into problems. As I understand it, I do not need to use conditions. I need to use the lock / unlock only. But why thread hang in the method unlock? – HotIceCream Jul 22 '15 at 14:17
  • What problems you ran into? You don't have any condition or relation between consumers and producers like the one in example? "*But why thread hang in the method unlock?*" I haven't got it, you mean in your question? – hagrawal7777 Jul 22 '15 at 14:36
2

I think your solution is simple and elegant, and think you should keep it with some modifications. The modifications I propose are synchronization.

Without it, thread interference and memory consistancy errors can (and very often does) occur. On top of that, you can't wait or notify on a lock you don't own (and you own it if you have it inside a synchronized block..). The fix is easy, just add a mLock synchronize block where you wait/notify on it. Also, as you're changing mLocked from a different thread you will want to mark it volatile.

private Object mLock = new Object();
private volatile boolean mLocked = false;

public void lock() {
    mLocked = true;
}

public void unlock() {
    synchronized(mlock) {
        mLocked = false;
        mLock.notify();
    }

}

public void run() {
    ....
            Record r = mQueue.take();
            synchronized(mLock) {
                while (mLocked) {
                    mLock.wait();
                }
            }
            process(r);
}
ddmps
  • 4,350
  • 1
  • 19
  • 34
  • I wrote the same code. But then I thought that if you go in sync block with the wait, then I don't to go in sync block with notify. But I was wrong: http://stackoverflow.com/questions/13249835/java-does-wait-release-lock-from-synchronized-block – HotIceCream Jul 22 '15 at 14:06
  • 2
    May be will be better replace if (mLocked) on while(mLocked). http://stackoverflow.com/a/2779565/1173794 – HotIceCream Jul 22 '15 at 14:07
1

Create a new class that extends an implementation of BlockingQueue. Add two new methods pause() and unpause(). Where needed, consider the paused flag and use the other blockingQueue2 to wait (in my example only in the take() method, and not in put()):

public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 184661285402L;

    private Object lock1 = new Object();//used in pause() and in take()
    private Object lock2 = new Object();//used in pause() and unpause()

    //@GuardedBy("lock")
    private volatile boolean paused;

    private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>();

    public void pause() {
        if (!paused) {
            synchronized (lock1) {
            synchronized (lock2) {
                if (!paused) {
                    paused = true;
                    blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty
                }
            }
            }
        }
    }

    public void unpause() throws InterruptedException {
        if (paused) {
            synchronized (lock2) {
                paused = false;
                blockingQueue2.put(new Object());//release waiting thread, if there is one
            }
        }
    }

    @Override
    public E take() throws InterruptedException {
        E result = super.take();

        if (paused) {
            synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting
                if (paused) {
                    blockingQueue2.take();
                }
            }
        }

        return result;
    }

    //TODO override similarly the poll() method.
}
V G
  • 18,822
  • 6
  • 51
  • 89