0

I want to use the parallelism that Java 8 Streams provide, but I also need certain operations to be done in a specific order or else everything will break. The problem is that using a stream means that the code is now asynchronous [NOTE: THIS IS INCORRECT], and I can't figure out how to make something happen only when it's finished iterating over the whole set. [NOTE: THIS HAPPENS AUTOMATICALLY]

Right now, my code is as follows:

    public void iterateOverMap(Map<String, String> m)
    {
        AtomicInteger count = new AtomicInteger(0);
        
        m.keySet().stream().forEach((k) -> {
                    Object o = m.get(k);
                    
                    // do stuff with o
                    
                    count.incrementAndGet();
                });
        
        // spin up a new thread to check if the Stream is done
        new Thread(() -> {
            for (;;)
            {
                if (count.intValue() >= map.size())
                    break;
            }
            afterFinishedIterating();
        }).start();
    }

I don't like the idea of having to spin up a new thread or block the main thread just to keep track of this one thing, but I can't think of how else I could do it. Does anyone know a better alternative?

Thanks!

ABadHaiku
  • 65
  • 1
  • 6
  • 3
    It’s not asynchronous, it’s parallel. Synchronous and parallel. It will return when it completes iteration. Also, don’t busy wait. – Boris the Spider Apr 07 '21 at 22:14
  • @BoristheSpider I only implemented the counter because I was having issues with the program executing `afterFinishedIterating()` before the `forEach` was finished. Also, obviously I shouldn't busy-wait, but what should I do instead of that? – ABadHaiku Apr 07 '21 at 22:18
  • 1
    Nothing is asynchronous; I don’t know what problem you’re trying to solve. – Boris the Spider Apr 07 '21 at 22:21
  • A stream can be serial or parallel. If parallel, the processing of stream operations can be handled by temporarily created threads ***while the stream is executing***. This happens "under the covers". If the stream is processed in parallel, your main thread (that initiated the stream pipeline) waits until the terminal operation is complete. A stream is never asynchronous with the initiating thread, but may be parallel internally. – Jim Garrison Apr 07 '21 at 22:51
  • I strongly suggest you very carefully read the [java.util.stream Package Summary Javadoc](https://docs.oracle.com/en/java/javase/15/docs/api/java.base/java/util/stream/package-summary.html). – Jim Garrison Apr 07 '21 at 22:53
  • 2
    Also, use `Map#entrySet()` instead of iterating over the keys and re-fetching every entry. – Jim Garrison Apr 07 '21 at 23:11
  • @JimGarrison I do use the key inside the loop, though, so I need access to both parts. – ABadHaiku Apr 07 '21 at 23:12
  • 2
    @ABadHaiku Reread the definition of `Map#entrySet()`. You'll see each `Map.Entry<>` contains both the key and the value. – Jim Garrison Apr 07 '21 at 23:14

2 Answers2

2

Stream processing is synchronous.

If you want an example of how to track the progress of a Stream, you can use the peek() intermediate operation, but please keep in mind that it should ideally be used for debugging purposes

Example taken from my other answer

Stream<MyData> myStream = readData();
final AtomicInteger loader = new AtomicInteger();
int fivePercent = elementsCount / 20;
MyResult result = myStream
    .map(row -> process(row))
    .peek(stat -> {
        if (loader.incrementAndGet() % fivePercent == 0) {
            System.out.println(loader.get() + " elements on " + elementsCount + " treated");
            System.out.println((5*(loader.get() / fivePercent)) + "%");
        }
    })
    .reduce(MyStat::aggregate);
Yassin Hajaj
  • 21,337
  • 9
  • 51
  • 89
  • Why should `peek` not be used in production?? Where is that in the docs? – Boris the Spider Apr 07 '21 at 22:23
  • @BoristheSpider we can discuss semantics but the doc says it should only be used for **debugging purposes** which we can conclude is not used in production, unless you're debugging on production of course, which would be... strange... – Yassin Hajaj Apr 07 '21 at 22:25
  • It says “mainly to support” not “only for”. `peek` is also useful for running side effecting operations; not ideal but sometimes necessary when bridging to old APIs. – Boris the Spider Apr 07 '21 at 22:28
  • Sorry, I accidentally made the line `keySet().forEach()` instead of `keySet().stream().forEach()`. Fixed. Also, now I'm confused as to why I was getting synchronicity errors when I didn't have the counter. – ABadHaiku Apr 07 '21 at 22:29
  • @ABadHaiku those two are near completely equivalent. – Boris the Spider Apr 07 '21 at 22:30
  • 1
    @BoristheSpider I changed it, hope the wording is now ok – Yassin Hajaj Apr 07 '21 at 22:31
  • @ABadHaiku what do you mean with ***synchronicity errors*** ? – Yassin Hajaj Apr 07 '21 at 22:32
  • @YassinHajaj The code inside the `ForEach` calls methods that check the source data, and - if any errors are found - adds the error messages to one of a few `Collections.synchronizedList(LinkedList)`s. (should prob be ConcurrentLList) My problem was that when I didn't have the counter, `afterFinishedIterating()` - which is in the actual program prints each list to a file - was seemingly executing before the `ForEach` had finished, because no error messages appeared in the output file. If the thread truly blocks until the `ForEach` is done, then I suppose it must have been another problem. – ABadHaiku Apr 07 '21 at 22:37
  • Yes it's definitely linked to something else indeed, feel free to post another question if you really can't find a solution by looking into it yourself first, it'd be with pleasure that I'll help – Yassin Hajaj Apr 07 '21 at 22:42
  • 1
    I just took another look at the javadocs for `Collections.synchronizedList()`, and wouldn't you know it but "It is imperative that the user manually synchronize on the returned list when traversing it via Iterator, Spliterator or Stream." Turns out I'm not crazy after all! Instead, I'm just using the wrong kind of lists... – ABadHaiku Apr 07 '21 at 22:42
  • 1
    @ABadHaiku no worries, programming makes us think we're crazy but most of us are completely sane :D glad you could get there – Yassin Hajaj Apr 07 '21 at 22:43
  • 2
    @ABadHaiku 1) [Don’t use `LinkedList`](https://stackoverflow.com/q/322715/2711488) unless you’re confident to be the one in a million who really needs it. 2) Don’t wrap a list in `synchronizedList` but rather use the correct terminal operation (in 99.9% of all cases: *not* `forEach`). Here, `.filter(x -> /* return whether error has been found */) .collect(Collectors.toList())`. This provides the correct result, even in the correct encounter order (unlike `forEach` adding to a synchronized list) without the synchronization overhead. – Holger Apr 08 '21 at 12:28
  • @Holger I thought that `LinkedList` was better for adding and removing, while `ArrayList` was better for retrieving. I have four different lists I'm adding to for different kinds of errors that require different formatting in the output, and I'm iterating over hundreds of data entries, so I don't want to do new loops each time. Now that I think about it, though, `1 loop * 4 operations = 4 loops * 1 operation`. I don't know why I always thought multiple loops would be slower than multiple operations in one loop. Especially with the optimization `.filter()` provides, it might even be faster! – ABadHaiku Apr 09 '21 at 17:44
  • 1
    @ABadHaiku follow the link I gave in the previous comment and read the answers carefully. Even adding and removing to and from a `LinkedList` are only better in very special circumstances (this is connected with the way, Java’s Collection API interacts with it, the *concept* of linked lists is fine). A stream pipeline with multiple chained operations still is *one* loop. See [Loop fusion of Stream in Java-8 (how it works internally)](https://stackoverflow.com/q/42804226/2711488). and [Java streams lazy vs fusion vs short-circuiting](https://stackoverflow.com/q/35150231/2711488) – Holger Apr 09 '21 at 17:55
  • @Holger By four operations, I meant that the `ForEach` code looked like `set.stream().forEach(el -> { checkErrors(el); checkStatus(el); ETC. });` with each of the function calls adding `el`'s ID to a list (with formatting) via side-effect if errors were found or if the status was bad. Honestly, if I hadn't thought that I should always avoid multiple loops in all cases, I might have realized that I was just doing convoluted `.filter()` operations and using four `Stream`s of the same set would have been faster/less buggy. Thanks for helping me realize that, lol. – ABadHaiku Apr 09 '21 at 18:38
0

Turns out the problem I was having was with the thread-safe but nondeterministic-when-iterating "Collections.synchronizedList(new LinkedList<>())" lists I was using, not with the Stream usage. Stream is not asynchronous like I had assumed, so the answer is just "you don't have to; it does it for you".

ABadHaiku
  • 65
  • 1
  • 6