This is a variation on CounterLatch
, available from the Apache site.
Their version, for reasons best known to themselves, blocks the caller thread while the variable (AtomicInteger
) is at a given value.
But it is the height of easiness to tweak this code so that you can choose either just what the Apache version does, or... to say "wait here until the counter reaches a certain value". Arguably the latter is going to have more applicability. In my particular case I rustled this up because I wanted to check that all "chunks" had been published in SwingWorker.process()
... but I have since found other uses for it.
Here it is written in Jython, officially the best language in the world (TM). I am going to rustle up a Java version in due course.
class CounterLatch():
def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
self.count = java.util.concurrent.atomic.AtomicLong( initial )
self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )
class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
def tryAcquireShared( sync_self, arg ):
if lift_on_reached:
return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
else:
return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
def tryReleaseShared( self, args ):
return True
self.sync = Sync()
self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False
def await( self, *args ):
if args:
assert len( args ) == 2
assert type( args[ 0 ] ) is int
timeout = args[ 0 ]
assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
unit = args[ 1 ]
return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
else:
self.sync.acquireSharedInterruptibly( 1 )
def count_relative( self, n ):
previous = self.count.addAndGet( n )
if previous == self.signal.get():
self.sync.releaseShared( 0 )
return previous
NB the Apache version uses the keyword volatile
for signal
and released
. In Jython I don't think this exists as such, but using AtomicInteger
and AtomicBoolean
should ensure that no values are "out of date" in any thread.
Example usage:
In the SwingWorker constructor:
self.publication_counter_latch = CounterLatch()
In SW.publish:
# increase counter value BEFORE publishing chunks
self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )
In SW.process:
# ... do sthg [HERE] with the chunks!
# AFTER having done what you want to do with your chunks:
self.publication_counter_latch.count_relative( - len( chunks ) )
In the thread waiting for chunk processing to stop:
worker.publication_counter_latch.await()