6

I'm looking for a queue that would be the asynchronous (non-blocking) equivalent of java.util.concurrent.BlockingQueue. Its interface would include:

public interface AsynchronousBlockingQueue<E> {
    // - if the queue is empty, return a new CompletableFuture,
    //   that will be completed next time `add` is called
    // - if the queue is not empty, return a completed CompletableFuture,
         containing the first element of the list
    public CompletableFuture<E> poll();

    // if polling is in progress, complete the ongoing polling CompletableFuture.
    // otherwise, add the element to the queue
    public synchronized void add(E element);
}

If that matters, there should be just one poller thread, and polling should be done sequentially (poll will not be called when polling is already in progress).

I expected it to already exist in the JVM, but I couldn't find it, and of course I'd rather use something from the JVM than write it myself.

Another constraint, I'm stuck with Java 8 (even though I'm definitely interested in knowing what exists in more recent versions).

Naman
  • 27,789
  • 26
  • 218
  • 353
yannick1976
  • 10,171
  • 2
  • 19
  • 27
  • how about [that](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html) ? – Vault23 Oct 17 '19 at 14:32
  • 2
    Possibly related to [this](https://stackoverflow.com/questions/7318166/being-asynchronously-notified-of-a-blockingqueue-having-an-item-available) and [this](https://stackoverflow.com/questions/32793159/asynchronous-execution-in-blockingqueue).. Interesting question. – Naman Oct 17 '19 at 14:35
  • @Naman yes they're the ones I saw... but they don't really answer :) – yannick1976 Oct 17 '19 at 14:46
  • 1
    @Vault23 ConcurrentLinkedQueue has nothing asynchronous (or blocking, for that matter), it's just thread-safe – yannick1976 Oct 17 '19 at 14:47

1 Answers1

3

So finally I wrote my own class... Interested in comments :)

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;

public class AsynchronousBlockingQueue<E> {
    CompletableFuture<E> incompletePolling = null;
    Queue<E> elementsQueue = new LinkedList<>();

    // if the queue is empty, return a new CompletableFuture, that will be completed next time `add` is called
    // if the queue is not empty, return a completed CompletableFuture containing the first element of the list
    public synchronized CompletableFuture<E> poll() {
        // polling must be done sequentially, so this shouldn't be called if there is a poll ongoing.
        if (incompletePolling != null)
            throw new IllegalStateException("Polling is already ongoing");
        if (elementsQueue.isEmpty()) {
            incompletePolling = new CompletableFuture<>();
            return incompletePolling;
        }
        CompletableFuture<E> result = new CompletableFuture<>();
        result.complete(elementsQueue.poll());
        return result;
    }

    // if polling is in progress, complete the ongoing polling CompletableFuture.
    // otherwise, add the element to the queue
    public synchronized void add(E element) {
        if (incompletePolling != null) {
            CompletableFuture<E> result = incompletePolling;
            // removing must be done first because the completion could trigger code that needs the queue state to be valid
            incompletePolling = null;
            result.complete(element);
            return;
        }
        elementsQueue.add(element);
    }


}
yannick1976
  • 10,171
  • 2
  • 19
  • 27