4

I have a generator class that owns a Thread in which a number of "records" to be generated is determined, then generates that many records (which get placed in a BlockingQueue for retrieval by another thread).

I'd like the other thread to know how many records are going to be generated (for sensible progress reporting among other things).

It seems Future gives me exactly the interface I'm after, but I'm new to Java, and not sure of the idiomatic way of implementing it.

My background is in C++/Win32, so I'd normally use a win32 "Event" (as created by CreateEvent(0, true, false, 0), with SetEvent and WaitForSingleObject for my signal and wait implementations). I've noticed Java has a CountDownLatch, but this somehow feels heavier than what I'm after (somewhat akin to using an int when I really want a boolean), and it seems unintuitive for this purpose (to me, anyway).

So here's my code using CountDownLatch and a Future. I've distilled my real code down a bit here (removed irrelevant implementation details and ignoring all error handling).

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;

    public abstract class Generator {

        private CountDownLatch numRecordsSignal = new CountDownLatch(1);

        private int numRecords;

        private BlockingQueue<Record> queue = new LinkedBlockingQueue<Record>();

        public Generator() {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    numRecords = calculateNumRecords();
                    numRecordsSignal.countDown();

                    for (Record r : generateRecords()) {
                        try {
                            queue.put(r);
                        } catch (InterruptedException e) {
                            // [ ... snip ... ]
                        }
                    }                
                }
            }).start();
        }

        public Future<Integer> numRecords() {
            return new Future<Integer>() {
                // Ignore cancel for now (It wouldn't make sense to cancel 
                // just this part of the Generator's work, anyway).
                public boolean cancel(boolean mayInterruptIfRunning) { 
                    return false; 
                }

                public Integer get() throws InterruptedException {
                    numRecordsSignal.await();
                    return numRecords;
                }

                public Integer get(long timeout, TimeUnit unit) 
                        throws InterruptedException {
                    numRecordsSignal.await(timeout, unit);
                    return numRecords;
                }

                public boolean isCancelled() {
                    return false;
                }

                public boolean isDone() {
                    // Since we can't cancel, just check the state of the 
                    // signal
                    return numRecordsSignal.getCount() == 0;
                }
            };
        }

        public Record nextRecord() throws InterruptedException {
            return queue.take();
        }

        /** --- Boring stuff below this line --- */
        public interface Record { }

        protected abstract int calculateNumRecords();

        protected abstract Iterable<Record> generateRecords();                        
    }

Now for my actual questions:

  1. Is there a better mechanism than CountDownLatch for single-shot signalling?
  2. I want callers to be able to either wait or poll on the result, but don't need them to be able to cancel the operation. Is Future the right way to expose this stuff?
  3. Does any of this stuff look particularly "un-Java"? Am I on the wrong track completely?

Edit:

Just to clarify, I expect the caller to be able to do the following:

    Generator gen = new Generator();
    Integer numRecords = gen.numRecords().get();  // This call might block waiting for the result
    numRecords = gen.numRecords().get(); // This call will never block, as the result is already available.

It's just a slow-to-initialise value I'm trying to implement. Once the "initialised" condition is met, it should latch. The value doesn't get re-evaluated once it's known.

Martin
  • 3,703
  • 2
  • 21
  • 43

5 Answers5

3

Side comment

You should not start a thread in a constructor - it is very conceivable that the Generator object is not fully created when the thread starts, and the countdown latch could well be null for example. You can create the thread in the constructor, but should start it in a seperate method. Your calling code would become:

Generator g = new Generator();
g.start();

Your question

You are reimplementing a Future yourself, which is not necessary nor desirable in my opinion. I would redesign the class and make Generator implement Callable<Integer> and run it through an executor. That provides you with several things:

  • remove the threading logic from the Generator, which enables you a more efficient management of your threads at a higher level in your call stack
  • the integer is returned via the future in your calling code and you rely on the JDK to handle the implementation
  • I have assumed that it's ok to first populate the queue then return the integer
  • you can call future.get() as many times as you want - it will only block the first time it is called.
public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(1);
    Future<Integer> future = executor.submit(new GeneratorImpl()); //a concrete implementation of Generator
    int numRecords = 0;
    try {
        numRecords = future.get(); //you can use a get with timeout here
    } catch (ExecutionException e) {
        //an exception happened in Generator#call()
    } catch (InterruptedException e) {
        //handle it
    }

    //don't forget to call executor.shutdown() when you don't need it any longer
}

public abstract class Generator implements Callable<Integer> {

    private BlockingQueue<Record> queue = new LinkedBlockingQueue<Record>();

    @Override
    public Integer call() {
        int numRecords = calculateNumRecords();
        for (Record r : generateRecords()) {
            try {
                queue.put(r);
            } catch (InterruptedException e) {
                // [ ... snip ... ]
            }
        }
        return numRecords;
    }

    public Record nextRecord() throws InterruptedException {
        return queue.take();
    }

    /**
     * --- Boring stuff below this line ---
     */
    public interface Record {
    }

    protected abstract int calculateNumRecords();

    protected abstract Iterable<Record> generateRecords();
}

EDIT

If you need to return numRecods asap, you can populate your queue in a separate thread:

    public Integer call() {
        int numRecords = calculateNumRecords();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (Record r : generateRecords()) {
                    try {
                        queue.put(r);
                    } catch (InterruptedException e) {
                        // [ ... snip ... ]
                    }
                }
            }
        }).start(); //returns immediately
        return numRecords;
    }
Community
  • 1
  • 1
assylias
  • 321,522
  • 82
  • 660
  • 783
  • 1
    This is the problem with distilling the questions down so far (sorry). I need to be able to get the metadata (numRecords in my example) as early as possible. I've tried to convey that by suggesting it might get used for progress reporting, which would require that it come through before the data. – Martin Aug 02 '12 at 10:12
  • +1 for the comment on initialisation. I assumed the initialisers would run between super construction and the class constructor. And thanks for the effort in explaining your alternate approach. – Martin Aug 02 '12 at 10:14
  • 1
    @Martin See my edit. There are of course other approaches. For example, if each record takes time to generate, you could use a [CompletionService](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html) which allows you to get the records as they become available instead of waiting until all of them are generated. – assylias Aug 02 '12 at 10:28
  • Thanks again. Unfortunately in my actual case, the metadata isn't necessarily available before *all* of the records, and I don't want to delay record processing. The data is coming from a stream that I have little control over, and I can't predict the precise order that records and metadata come in. I hadn't been aware of CompletionService, and that's likely to come in handy later in other parts of my system. The records here are typically quick to generate, but there'll be lots of them, and they're sometimes slow to consume. – Martin Aug 03 '12 at 00:46
1

The standard Java equivalents of "WaitOnSingleEvent()" and "SetEvent()" for Java threads are "wait()", "notify()" and "notifyAll()".

paulsm4
  • 114,292
  • 17
  • 138
  • 190
  • 2
    But doesn't wait "un-notify" the object? WaitForSingleObject won't unsignal an event if it's configured for manual reset. I want all callers currently waiting on the object, *and all future callers* to be unblocked. – Martin Aug 02 '12 at 06:27
  • @Martin - yes - that's where the difference between Semaphore and Condition matters. – Fakrudeen Aug 02 '12 at 07:40
1

After looking at implementing my own signal mechanism and following the bread-crumb trail left by others doing the same thing, I came across the javadoc for AbstractQueuedSynchronizer, which includes a code snippet for a "BooleanLatch", which perfectly meets my needs:

class BooleanLatch {
    private static class Sync extends AbstractQueuedSynchronizer {
        boolean isSignalled() { return getState() != 0; }

        protected int tryAcquireShared(int ignore) {
            return isSignalled()? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignore) {
            setState(1);
           return true;
        }
    }

    private final Sync sync = new Sync();
    public boolean isSignalled() { return sync.isSignalled(); }
    public void signal()         { sync.releaseShared(1); }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
 }

Doing a bit more searching, I found that a number of frameworks include a BooleanLatch (Apache Qpid being one). Some implementations (such as Atlassian's), are auto-resetting, which would make them inappropriate for my needs.

Martin
  • 3,703
  • 2
  • 21
  • 43
0

Standard observer notification pattern can be helpful here, if i understood your problem correctly.

arya
  • 170
  • 2
  • 10
  • That pushes threading issues back onto the caller though: the observer notification would happen on a different thread to all the other caller action. I want the processing to happen on a thread that is never exposed to the caller. – Martin Aug 02 '12 at 09:40
0

For one shot signalling in this scenario Semaphore is better as it remembers the "signal". Condition object [wait() is on a condition] won't remember the signal.

 Semaphore numRecordsUpdated = new Semaphore(0);

In Generator

  numRecordsUpdated.release();

In consumer

  numRecordsUpdated.acquire();
Fakrudeen
  • 5,778
  • 7
  • 44
  • 70
  • Wouldn't the consumer also need to re-release the Semaphore? It should be possible for the consumer to grab the result twice in a row. The first call might block, but the second call should always return correctly without blocking. – Martin Aug 02 '12 at 09:46