1

I am unsure how to do this, I'd like to iterate the ConcurrentLinkedQueue (all of it), removing the i-th item and performing some code on it.

This is what I was used to do:

public static class Input {

    public static final ConcurrentLinkedQueue<TreeNode> treeNodes = new ConcurrentLinkedQueue<>();
}

public static class Current {

    public static final ConcurrentHashMap<Integer, TreeNode> treeNodes = new ConcurrentHashMap<>();
}

TreeNode is a simple class

    TreeNode treeNode = Input.treeNodes.poll();

    while (treeNode != null) {

        treeNode.init(gl3);

        Current.treeNodes.put(treeNode.getId(), treeNode);

        treeNode = Input.treeNodes.poll();
    }

This is how I am trying to do using stream:

    Input.treeNodes.stream()
            .forEach(treeNode -> {
                Input.treeNodes.remove(treeNode);
                treeNode.init(gl3);
                Current.treeNodes.put(treeNode.getId(), treeNode);
            });

I am afraid that something may be error prone having to remove the item inside the forEach action.

So my question is:

Is this safe and/or are there any better ways to do it?

Holger
  • 285,553
  • 42
  • 434
  • 765
elect
  • 6,765
  • 10
  • 53
  • 119

2 Answers2

2

Just as you've assumed, you should not modify the backing collection while processing the stream because you might get a ConcurrentModificationException (just as with for(Object o:objectArray){} loops)

On the other hand it is not very clear which TreeNode you are trying to remove, as in the current case, seemingly you wish to remove all elements from the List, perform some actions on them and put them in a Map.

You may safely achieve your current logic via:

Input.treeNodes.stream()
         .map(treeNode -> {
             treeNode.init(gl3);
             Current.treeNodes.put(treeNode.getId(), treeNode);
         });
Input.treeNodes.clear();
Matyas
  • 13,473
  • 3
  • 60
  • 73
  • For each `TreeNode` element in `Input.treeNodes`, I want to remove it (as `poll()`), call its `init()` method and add it to the concurrent hashMap `Current.treeNodes` – elect Jul 08 '16 at 09:16
  • Exactly, all of them, I thought `poll()` the `concurrentLinkedQueue` and looping it with the `while` was sufficiently clear.. Anyway, thanks – elect Jul 08 '16 at 09:24
  • The the proposed solution should do the same thing as your `while loop` – Matyas Jul 08 '16 at 09:25
  • 1
    Ps: may I still get a `ConcurrentModificationException` even if I am using a `ConcurrentLinkedQueue`? – elect Jul 08 '16 at 09:27
  • Just read the documentation, and you are right you are safe to do so, and will not get an exception. I think it'd be better accepting the other answer :) – Matyas Jul 08 '16 at 10:09
  • 3
    I still would consider removing the elements the OP’s way discouraged. But using `map` to achieve a side effect is discouraged as well and without an actual terminal operation, your Stream operation does nothing. Further, it should be noted that processing the elements in one operation and clearing afterwards may remove unprocessed elements in case of concurrent additions. So a [polling](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html#poll--) loop would be the cleanest solution anyway. – Holger Jul 08 '16 at 10:14
2

This behavior is determine by the Spliterator used to construct the Stream. The documentation of ConcurrentLinkedQueue.spliterator() says:

Returns a Spliterator over the elements in this queue.
The returned spliterator is weakly consistent.

weakly consistent” implies:

Most concurrent Collection implementations (including most Queues) also differ from the usual java.util conventions in that their Iterators and Spliterators provide weakly consistent rather than fast-fail traversal:

  • they may proceed concurrently with other operations
  • they will never throw ConcurrentModificationException
  • they are guaranteed to traverse elements as they existed upon construction exactly once, and may (but are not guaranteed to) reflect any modifications subsequent to construction.

This implies that removing the encountered elements should not have any impact.

On the other hand, when other threads add or remove elements, the outcome of your Stream operation regarding these elements is unpredictable.

However, you should consider that remove(Object) is not the intended use case for a queue.

Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765
  • Given that `Input.treeNodes` is supposed to be a thread safe input point for resources that need to be later initialized by the opengl thread, which structure would you suggest? A concurrent Arraylist? Because I got the idea of using a concurrentLinkedQueue from [here](http://stackoverflow.com/questions/6916385/is-there-a-concurrent-list-in-javas-jdk) – elect Jul 08 '16 at 12:19
  • 3
    I don’t think that using a queue is wrong, even for this use case, i.e. you want to remove and process, which is an intended use case for a queue, when you just remove from the head. But creating a `Stream` doing this is tricky. In Java 9, you could use `Stream.generate(queue::poll).takeWhile(Objects::nonNull)`. But in Java 8, I’d stick with the equivalent loop. – Holger Jul 08 '16 at 12:35