2

I'm looking for a java collection that supports blocking read()s on a predicate. I wrote a simple version but it seems like this must have been invented already?

For example:

interface PredicateConsumerCollection<T> {

  public void put(T t);

  @Nullable
  public T get(Predicate<T> p, long millis) throws InterruptedException;
}

put() delivers its argument to a waiting consumer with a matching predicate, or stashes it in a store. A get() returns immediately if a suitable T is already in the store, or blocks till a suitable value is put(), or times out. Consumers compete but fairness isn't critical in my case.

Anyone aware of a such a collection?

tariksbl
  • 1,069
  • 6
  • 20

2 Answers2

2

There is no immediate class that can solve your problem, but a combination of a ConcurrentHashMap and a BlockingQueue could be a solution.

The hash map is defined as:

final ConcurrentHashMap<Predicate, LinkedBlockingQueue<Result>> lookup;

The put needs to ensure, that for each Predicate a queue is added to the map, this can be done thread-safe using putIfAbsent.

If you have a fixed set of Predicates, you can simply pre-fill the list, then a Consumer can simply call lookup.get(Predicate).take()

If the amount of Predicates is unknown/too many, you need to write a wait/notify implementation for Consumers in case a Predicate is not yet in the list on your own.

TwoThe
  • 13,879
  • 6
  • 30
  • 54
  • This only works if every predicate describes a distinct set of `Result`s. If you have, for example, two predicates `x<3` and `x<5` then the result sets overlap and you need to store the `Result` in two queues if `Result.x=1`. – Ralf Feb 01 '14 at 14:04
  • Good suggestion and thanks! But unfortunately in my case the predicates are not known ahead of time. – tariksbl Feb 02 '14 at 14:48
0

I also need something very similar for testing that a certain JMS asynchronous message has been received within a certain timeout. It turns out that your question is relatively easy to implement by using basic wait/notify as explained in the Oracle tutorials. The idea is to make the put and query methods synchronized and let the query method do a wait. The put method calls notifyAll to wake up any waiting threads in the query method. The query method must then check if the predicate is matched. The most tricky thing is getting the timeout right due to waking up when the predicate does not match and due to possible " spurious wakeups". I found this stackoverflow post that provides the answer.

Here is the implementation I came up with:

import java.util.ArrayList;
import java.util.List;

// import net.jcip.annotations.GuardedBy;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;

public class PredicateConsumerCollectionImpl<T> implements
        PredicateConsumerCollection<T> {

    // @GuardedBy("this")
    private List<T> elements = new ArrayList<>();

    @Override
    public synchronized void put(T t) {
        elements.add(t);
        notifyAll();
    }

        @Override
public synchronized T query(Predicate<T> p, long millis)
        throws InterruptedException {
    T match = null;
    long nanosOfOneMilli = 1000000L;
    long endTime = System.nanoTime() + millis * nanosOfOneMilli;
    while ((match = Iterables.find(elements, p, null)) == null) {
        long sleepTime = endTime - System.nanoTime();
        if (sleepTime <= 0) {
            return null;
        }
        wait(sleepTime / nanosOfOneMilli,
                (int) (sleepTime % nanosOfOneMilli));
    }
    return match;
}

    synchronized boolean contains(T t) {
        return elements.contains(t);
    }
}

And here is a JUnit test that proves that the code works as intended:

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import org.junit.Before;
import org.junit.Test;

import com.google.common.base.Predicate;

/**
 * Unit test for the {@link PredicateConsumerCollection} implementation.
 * 
 * <p>
 * The tests act as consumers waiting for the test Producer to put a certain
 * String.
 */
public class PredicateConsumerCollectionTest {

    private static class Producer implements Runnable {

        private PredicateConsumerCollection<String> collection;

        public Producer(PredicateConsumerCollection<String> collection) {
            this.collection = collection;
            collection.put("Initial");
        }

        @Override
        public void run() {
            try {
                int millis = 50;
                collection.put("Hello");
                Thread.sleep(millis);
                collection.put("I");
                Thread.sleep(millis);
                collection.put("am");
                Thread.sleep(millis);
                collection.put("done");
                Thread.sleep(millis);
                collection.put("so");
                Thread.sleep(millis);
                collection.put("goodbye!");
            } catch (InterruptedException e) {
                e.printStackTrace();
                fail("Unexpected InterruptedException");
            }
        }

    }

    private PredicateConsumerCollectionImpl<String> collection;
    private Producer producer;

    @Before
    public void setup() {
        collection = new PredicateConsumerCollectionImpl<>();
        producer = new Producer(collection);
    }

    @Test(timeout = 2000)
    public void wait_for_done() throws InterruptedException {
        assertTrue(collection.contains("Initial"));
        assertFalse(collection.contains("Hello"));

        Thread producerThread = new Thread(producer);
        producerThread.start();

        String result = collection.query(new Predicate<String>() {
            @Override
            public boolean apply(String s) {
                return "done".equals(s);
            }
        }, 1000);
        assertEquals("done", result);
        assertTrue(collection.contains("Hello"));
        assertTrue(collection.contains("done"));

        assertTrue(producerThread.isAlive());
        assertFalse(collection.contains("goodbye!"));

        producerThread.join();

        assertTrue(collection.contains("goodbye!"));
    }

    @Test(timeout = 2000)
    public void wait_for_done_immediately_happens() throws InterruptedException {
        Thread producerThread = new Thread(producer);
        producerThread.start();

        String result = collection.query(new Predicate<String>() {
            @Override
            public boolean apply(String s) {
                return "Initial".equals(s);
            }
        }, 1000);
        assertEquals("Initial", result);
        assertFalse(collection.contains("I"));

        producerThread.join();

        assertTrue(collection.contains("goodbye!"));
    }

    @Test(timeout = 2000)
    public void wait_for_done_never_happens() throws InterruptedException {
        Thread producerThread = new Thread(producer);
        producerThread.start();

        assertTrue(producerThread.isAlive());

        String result = collection.query(new Predicate<String>() {
            @Override
            public boolean apply(String s) {
                return "DONE".equals(s);
            }
        }, 1000);

        assertEquals(null, result);
        assertFalse(producerThread.isAlive());
        assertTrue(collection.contains("goodbye!"));
    }

}
Community
  • 1
  • 1
Henno Vermeulen
  • 1,495
  • 2
  • 15
  • 25
  • Thanks. Mine was similar, I used a Guava `SettableFuture` (promise) to avoid the timeout wait-check loop. – tariksbl Mar 12 '14 at 03:15
  • @tariksbl interesting, having to add that boilerplate seemed like duplicating something that should already be in some concurrency utility. Do you have an implementation for the same interface that passes the same unit test? I don't see how a SettableFuture can make the code easier. Shouldn't the put method then call set on it when an element comes in for which the predicate matches? That would mean you need to maintain a collection of future's and predicates for each query that is currently running. – Henno Vermeulen Mar 12 '14 at 08:32
  • I'm not at liberty to post the code but yes my impl maintains a collection of waiters which is traversed each `put()`. – tariksbl Mar 12 '14 at 17:02