I also need something very similar for testing that a certain JMS asynchronous message has been received within a certain timeout. It turns out that your question is relatively easy to implement by using basic wait/notify as explained in the Oracle tutorials. The idea is to make the put and query methods synchronized and let the query method do a wait. The put method calls notifyAll to wake up any waiting threads in the query method. The query method must then check if the predicate is matched. The most tricky thing is getting the timeout right due to waking up when the predicate does not match and due to possible " spurious wakeups". I found this stackoverflow post that provides the answer.
Here is the implementation I came up with:
import java.util.ArrayList;
import java.util.List;
// import net.jcip.annotations.GuardedBy;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
public class PredicateConsumerCollectionImpl<T> implements
PredicateConsumerCollection<T> {
// @GuardedBy("this")
private List<T> elements = new ArrayList<>();
@Override
public synchronized void put(T t) {
elements.add(t);
notifyAll();
}
@Override
public synchronized T query(Predicate<T> p, long millis)
throws InterruptedException {
T match = null;
long nanosOfOneMilli = 1000000L;
long endTime = System.nanoTime() + millis * nanosOfOneMilli;
while ((match = Iterables.find(elements, p, null)) == null) {
long sleepTime = endTime - System.nanoTime();
if (sleepTime <= 0) {
return null;
}
wait(sleepTime / nanosOfOneMilli,
(int) (sleepTime % nanosOfOneMilli));
}
return match;
}
synchronized boolean contains(T t) {
return elements.contains(t);
}
}
And here is a JUnit test that proves that the code works as intended:
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Predicate;
/**
* Unit test for the {@link PredicateConsumerCollection} implementation.
*
* <p>
* The tests act as consumers waiting for the test Producer to put a certain
* String.
*/
public class PredicateConsumerCollectionTest {
private static class Producer implements Runnable {
private PredicateConsumerCollection<String> collection;
public Producer(PredicateConsumerCollection<String> collection) {
this.collection = collection;
collection.put("Initial");
}
@Override
public void run() {
try {
int millis = 50;
collection.put("Hello");
Thread.sleep(millis);
collection.put("I");
Thread.sleep(millis);
collection.put("am");
Thread.sleep(millis);
collection.put("done");
Thread.sleep(millis);
collection.put("so");
Thread.sleep(millis);
collection.put("goodbye!");
} catch (InterruptedException e) {
e.printStackTrace();
fail("Unexpected InterruptedException");
}
}
}
private PredicateConsumerCollectionImpl<String> collection;
private Producer producer;
@Before
public void setup() {
collection = new PredicateConsumerCollectionImpl<>();
producer = new Producer(collection);
}
@Test(timeout = 2000)
public void wait_for_done() throws InterruptedException {
assertTrue(collection.contains("Initial"));
assertFalse(collection.contains("Hello"));
Thread producerThread = new Thread(producer);
producerThread.start();
String result = collection.query(new Predicate<String>() {
@Override
public boolean apply(String s) {
return "done".equals(s);
}
}, 1000);
assertEquals("done", result);
assertTrue(collection.contains("Hello"));
assertTrue(collection.contains("done"));
assertTrue(producerThread.isAlive());
assertFalse(collection.contains("goodbye!"));
producerThread.join();
assertTrue(collection.contains("goodbye!"));
}
@Test(timeout = 2000)
public void wait_for_done_immediately_happens() throws InterruptedException {
Thread producerThread = new Thread(producer);
producerThread.start();
String result = collection.query(new Predicate<String>() {
@Override
public boolean apply(String s) {
return "Initial".equals(s);
}
}, 1000);
assertEquals("Initial", result);
assertFalse(collection.contains("I"));
producerThread.join();
assertTrue(collection.contains("goodbye!"));
}
@Test(timeout = 2000)
public void wait_for_done_never_happens() throws InterruptedException {
Thread producerThread = new Thread(producer);
producerThread.start();
assertTrue(producerThread.isAlive());
String result = collection.query(new Predicate<String>() {
@Override
public boolean apply(String s) {
return "DONE".equals(s);
}
}, 1000);
assertEquals(null, result);
assertFalse(producerThread.isAlive());
assertTrue(collection.contains("goodbye!"));
}
}