37

Does anyone know if there is any latch implementation that does the following:

  • has a method to decrement the latch value, or wait if the value is zero
  • has a method for waiting for the latch value to be zero
  • has a method for adding a number to the latch's value
Razvi
  • 2,808
  • 5
  • 31
  • 39

6 Answers6

96

You could also use a Phaser (java.util.concurrent.Phaser)

final Phaser phaser = new Phaser(1); // register self
while (/* some condition */) {
    phaser.register(); // Equivalent to countUp
    // do some work asynchronously, invoking
    // phaser.arriveAndDeregister() (equiv to countDown) in a finally block
}
phaser.arriveAndAwaitAdvance(); // await any async tasks to complete
starball
  • 20,030
  • 7
  • 43
  • 238
Michael-7
  • 1,739
  • 14
  • 17
  • 1
    I found this lot more flexible than any other options. – donlys Jan 11 '17 at 20:28
  • 2
    *Note for Android* : `Phaser` is not available below Android SDK 21. If you're using Kotlin the lint doesn't warn you, neither does the compiler. – Gurupad Mamadapur Dec 28 '18 at 10:28
  • 3
    It took me a while to wrap my head around what a Phaser is doing. I would say there are 2 concepts, registered and awaiting. To await (make a thread wait), you need to be registered.That's why `Michael-7` (OP) initialized `Phaser(1)` instead of `Phaser()`. If he just did `Phaser()`, he also needs to call `phaser.register()`. – Ben Butterworth Mar 23 '21 at 21:39
11

java.util.concurrent.Semaphore seems to fit the bill.

  • acquire() or acquire(n)
  • also acquire() (not sure I understand what the difference is here) (*)
  • release() or release(n)

(*) Okay, there is no built-in method to wait until the semaphore becomes unavailable. I suppose you'd write your own wrapper for acquire that does a tryAcquire first and if that fails triggers your "busy event" (and continues using the normal acquire). Everyone would need to call your wrapper. Maybe subclass Semaphore?

Thilo
  • 257,207
  • 101
  • 511
  • 656
  • It's almost good. But I can't wait for it to be zero from what I know. – Razvi Jan 10 '13 at 10:00
  • also acquire() is the same as acquire(1) – Razvi Jan 10 '13 at 10:00
  • @assylias: Thanks, updated (I just take what Google gives me when I search the class name) – Thilo Jan 10 '13 at 10:01
  • according to documentation. – Subhrajyoti Majumder Jan 10 '13 at 10:02
  • You can wait for it to be zero by acquiring a lease (and releasing it immediately after if you don't want to block others). – Thilo Jan 10 '13 at 10:02
  • 3
    Stackoverflow could use some support to automatically transform/include JDK Javadoc links to the newest version... – Thilo Jan 10 '13 at 10:03
  • 1
    @Thilo I'm not sure what you are saying works, or I'm not understanding properly what you said. If you call acquire you get a lease, but I need to wait for all leases to be taken by some other threads. – Razvi Jan 10 '13 at 10:05
  • @Thilo Yeah, I've been thinking of using AbstractQueuedSynchronizer to implement a custom latch/semaphore. But I don't think I can use that either since I have two opposing ways in which the threads can block (one happens if the value is zero and one if the value is not zero). So I probably have to go lower and use LockSupport to implement it. – Razvi Jan 10 '13 at 10:13
7

Instead of starting back from AQS, you could use a simple implementation like below. It is somewhat naive (it is synchronized vs. AQS lock-free algorithms) but unless you expect to use it in a contented scenario it could be good enough.

public class CountUpAndDownLatch {
    private CountDownLatch latch;
    private final Object lock = new Object();

    public CountUpAndDownLatch(int count) {
        this.latch = new CountDownLatch(count);
    }

    public void countDownOrWaitIfZero() throws InterruptedException {
        synchronized(lock) {
            while(latch.getCount() == 0) {
                lock.wait();
            }
            latch.countDown();
            lock.notifyAll();
        }
    }

    public void waitUntilZero() throws InterruptedException {
        synchronized(lock) {
            while(latch.getCount() != 0) {
                lock.wait();
            }
        }
    }

    public void countUp() { //should probably check for Integer.MAX_VALUE
        synchronized(lock) {
            latch = new CountDownLatch((int) latch.getCount() + 1);
            lock.notifyAll();
        }
    }

    public int getCount() {
        synchronized(lock) {
            return (int) latch.getCount();
        }
    }
}

Note: I have not tested it in depth but it seems to behave as expected:

public static void main(String[] args) throws InterruptedException {
    final CountUpAndDownLatch latch = new CountUpAndDownLatch(1);
    Runnable up = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("IN UP " + latch.getCount());
                latch.countUp();
                System.out.println("UP " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };

    Runnable downOrWait = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("IN DOWN " + latch.getCount());
                latch.countDownOrWaitIfZero();
                System.out.println("DOWN " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };

    Runnable waitFor0 = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("WAIT FOR ZERO " + latch.getCount());
                latch.waitUntilZero();
                System.out.println("ZERO " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };
    new Thread(waitFor0).start();
    up.run();
    downOrWait.run();
    Thread.sleep(100);
    downOrWait.run();
    new Thread(up).start();
    downOrWait.run();
}

Output:

IN UP 1
UP 2
WAIT FOR ZERO 1
IN DOWN 2
DOWN 1
IN DOWN 1
ZERO 0
DOWN 0
IN DOWN 0
IN UP 0
DOWN 0
UP 0
assylias
  • 321,522
  • 82
  • 660
  • 783
3

For those needing an AQS based solution, here's one that worked for me:

public class CountLatch {

    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }

        @Override
        protected int tryAcquireShared(int arg) {
            return count.get() == releaseValue ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            return true;
        }
    }

    private final Sync sync;
    private final AtomicLong count;
    private volatile long releaseValue;

    public CountLatch(final long initial, final long releaseValue) {
        this.releaseValue = releaseValue;
        this.count = new AtomicLong(initial);
        this.sync = new Sync();
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public long countUp() {
        final long current = count.incrementAndGet();
        if (current == releaseValue) {
            sync.releaseShared(0);
        }
        return current;
    }

    public long countDown() {
        final long current = count.decrementAndGet();
        if (current == releaseValue) {
            sync.releaseShared(0);
        }
        return current;
    }

    public long getCount() {
        return count.get();
    }
}

You initialize the synchronizer with an initial and target value. Once the target value has been reached (by counting up and / or down), the waiting threads will be released.

Michael-7
  • 1,739
  • 14
  • 17
0

I needed one and built it using the same strategy as CountDownLatch which uses AQS (non-blocking), this class is also very similar (If not exact) to one created for Apache Camel, I think it is also lighter than JDK Phaser, this will act just like CountDownLact from JDK, it won't let you count down below zero and will allow you count down and up:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    
public class CountingLatch
{
  /**
   * Synchronization control for CountingLatch.
   * Uses AQS state to represent count.
   */
  private static final class Sync extends AbstractQueuedSynchronizer
  {
    private Sync()
    {
    }

    private Sync(final int initialState)
    {
      setState(initialState);
    }

    int getCount()
    {
      return getState();
    }

    protected int tryAcquireShared(final int acquires)
    {
      return getState()==0 ? 1 : -1;
    }

    protected boolean tryReleaseShared(final int delta)
    {
      // Decrement count; signal when transition to zero
      for(; ; ){
        final int c=getState();
        final int nextc=c+delta;
        if(nextc<0){
          return false;
        }
        if(compareAndSetState(c,nextc)){
          return nextc==0;
        }
      }
    }
  }

  private final Sync sync;

  public CountingLatch()
  {
    sync=new Sync();
  }

  public CountingLatch(final int initialCount)
  {
    sync=new Sync(initialCount);
  }

  public void increment()
  {
    sync.releaseShared(1);
  }

  public int getCount()
  {
    return sync.getCount();
  }

  public void decrement()
  {
    sync.releaseShared(-1);
  }

  public void await() throws InterruptedException
  {
    sync.acquireSharedInterruptibly(1);
  }

  public boolean await(final long timeout) throws InterruptedException
  {
    return sync.tryAcquireSharedNanos(1,TimeUnit.MILLISECONDS.toNanos(timeout));
  }
}
TWiStErRob
  • 44,762
  • 26
  • 170
  • 254
Guido Medina
  • 416
  • 4
  • 8
0

This is a variation on CounterLatch, available from the Apache site.

Their version, for reasons best known to themselves, blocks the caller thread while the variable (AtomicInteger) is at a given value.

But it is the height of easiness to tweak this code so that you can choose either just what the Apache version does, or... to say "wait here until the counter reaches a certain value". Arguably the latter is going to have more applicability. In my particular case I rustled this up because I wanted to check that all "chunks" had been published in SwingWorker.process()... but I have since found other uses for it.

Here it is written in Jython, officially the best language in the world (TM). I am going to rustle up a Java version in due course.

class CounterLatch():
    def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
        self.count = java.util.concurrent.atomic.AtomicLong( initial )
        self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )

        class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
            def tryAcquireShared( sync_self, arg ):
                if lift_on_reached:
                    return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
                else:
                    return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
            def tryReleaseShared( self, args ):
                return True

        self.sync = Sync()
        self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False

    def await( self, *args ):
        if args:
            assert len( args ) == 2
            assert type( args[ 0 ] ) is int
            timeout = args[ 0 ]
            assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
            unit = args[ 1 ]
            return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
        else:
            self.sync.acquireSharedInterruptibly( 1 )

    def count_relative( self, n ):
        previous = self.count.addAndGet( n )
        if previous == self.signal.get():
            self.sync.releaseShared( 0 )
        return previous

NB the Apache version uses the keyword volatile for signal and released. In Jython I don't think this exists as such, but using AtomicInteger and AtomicBoolean should ensure that no values are "out of date" in any thread.

Example usage:

In the SwingWorker constructor:

self.publication_counter_latch = CounterLatch() 

In SW.publish:

# increase counter value BEFORE publishing chunks
self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )

In SW.process:

# ... do sthg [HERE] with the chunks!
# AFTER having done what you want to do with your chunks:
self.publication_counter_latch.count_relative( - len( chunks ) )

In the thread waiting for chunk processing to stop:

worker.publication_counter_latch.await()
mike rodent
  • 14,126
  • 11
  • 103
  • 157