1

Suppose I have a Queue<String> and I want to empty the current contents of the queue and do something with each element. Using a loop I could do something like:

while (true) {
    String element = queue.poll();
    if (element == null) {
        return;
    }
    System.out.println(element);
}

This feels a bit ugly. Could I do this better with streams?

Note that there may be other threads accessing the queue at the same time, so relying on the size of the queue to know how many items to poll would be error prone.

Graeme Moss
  • 7,995
  • 4
  • 29
  • 42
  • what happens when `queue.poll` reports null, but there are other threads *after* that, that might put more elements into it? Your stream would be consumed at that time... – Eugene May 16 '17 at 14:43
  • good point, but I don't mind in this case if another thread puts elements on the queue after I call poll() and it returns null – Graeme Moss May 16 '17 at 14:45
  • Than `takeWhile` is your best bet as far as I can tell. I've also written that in my answer, but not only `StreamEx` has generate, but `Stream` also. And `takeWhile` can be added as a static method in your code base until java-9 is there... – Eugene May 16 '17 at 14:47

5 Answers5

6

Since you asked about “without blocking”, it seems you are referring to a BlockingQueue. In that case, it’s recommended to avoid repeatedly calling poll().

Instead, transfer all pending elements to a local collection in one go, then process them:

List<String> tmp = new ArrayList<>();
queue.drainTo(tmp);
tmp.forEach(System.out::println);

You may also avoid synchronizing on System.out (implicitly) multiple times:

List<String> tmp=new ArrayList<>();
queue.drainTo(tmp);
System.out.println(tmp.stream().collect(Collectors.joining(System.lineSeparator())));

or

List<String> tmp=new ArrayList<>();
queue.drainTo(tmp);
System.out.println(String.join(System.lineSeparator(), tmp));

(though that doesn’t bear a stream operation)…

Holger
  • 285,553
  • 42
  • 434
  • 765
  • I was using `ConcurrentLinkedQueue` which is not a `BlockingQueue` but that's not fixed in my mind so I could change this. – Graeme Moss May 16 '17 at 14:57
  • @Holger but isn't `drainTo` going to block all other `add`? i don't think this is that the OP wants – Eugene May 16 '17 at 15:00
  • 2
    @Eugene: if you `poll` *n* elements, you are blocking the queue for at least the same amount of time you are blocking it when transferring *n* items via `drainTo`. Usually, the overhead of making *n* distinct calls locking *n* times is much higher, resulting in a much longer blocking time than `drainTo` performing only one lock operation. If you are trying to optimize the max pause time, you may pass a *maxElements* argument to `drainTo` and ensure that the target list’s capacity is on par. – Holger May 16 '17 at 15:38
  • 2
    @Graeme Moss: using a `ConcurrentLinkedQueue` with `poll()` might be the right thing if you want to support concurrent `add` operations, but that actually doesn’t fit into the mind set of the “*I want to empty the current contents of the queue*” task. When polling while concurrent addition is allowed, there is no actual “current contents”. – Holger May 16 '17 at 15:46
  • Removes all available elements from this queue and adds themto the given collection. This operation may be moreefficient than repeatedly polling this queue. A failureencountered while attempting to add elements tocollection c may result in elements being in neither,either or both collections when the associated exception isthrown. Attempts to drain a queue to itself result in IllegalArgumentException. Further, the behavior ofthis operation is undefined if the specified collection ismodified while the operation is in progress. – Milan Jun 17 '21 at 09:51
1

You don't need to use streams to make the code less ugly (a stream solution would probably be more ugly).

String s = null;
while((s = queue.poll()) != null)
    System.out.println(s);
Kayaman
  • 72,141
  • 5
  • 83
  • 121
  • I don't really like assigning within a test. Is there another way? – Graeme Moss May 16 '17 at 14:11
  • 1
    `String s = queue.poll(); while (s != null) { System.out.println(s); s = queue.poll(); }` -- essentially what @Kayaman wrote without the "ugliness" of the break of the original question. But what Kayaman wrote is more than acceptable. – KevinO May 16 '17 at 14:14
  • 4
    In my mind assigning in a `while` condition was a common idiom in C. I don’t like it in Java either. Matter of taste and habit, though. Follow what your team does. – Ole V.V. May 16 '17 at 14:33
1

The best answer I can come up with at present is the following:

StreamEx.generate(() -> queue.poll())
        .takeWhile(Objects::nonNull)
        .forEach(System.out::println);

but that uses a library (StreamEx). Can we do this with vanilla Java?

Graeme Moss
  • 7,995
  • 4
  • 29
  • 42
  • 3
    How on earth is this better than using the simplest tools available? Streams didn't deprecate things like for and while loops. You don't get any bonus points for using streams where they don't add any value. – Kayaman May 16 '17 at 14:15
  • 1
    @GraemeMoss the only thing that is not available in vanilla java (at the moment - it will in jdk-9) is `takeWhile`; that can be put into a static method inside your project. http://stackoverflow.com/questions/20746429/limit-a-stream-by-a-predicate – Eugene May 16 '17 at 14:17
0

As per my understanding, if you want to use stream then you can't modify the same collection on which you are doing operations.

Below code might help you:

queue.stream().forEach(e -> {
        System.out.println(e);
    });
queue.clear();
Vip
  • 1,448
  • 2
  • 17
  • 20
0

I don’t think you want the complication, but if I am wrong, you may do:

    Spliterator<String> spliterator = new Spliterators.AbstractSpliterator<String>(queue.size(), 0) {
        @Override
        public boolean tryAdvance(Consumer<? super String> action) {
            String element = queue.poll();
            if (element == null) {
                return false;
            } else {
                action.accept(element);
                return true;
            }
        }
    };
    StreamSupport.stream(spliterator, false).forEach(System.out::println);

You may look into the docs of Spliterators and StreamSupport for numerous possible refinements.

Ole V.V.
  • 81,772
  • 15
  • 137
  • 161