32

I have a Stream processing a few millions of elements. The Map-Reduce algorithm behind it takes a few milliseconds, so task completion takes about twenty minutes.

Stream<MyData> myStream = readData();
MyResult result = myStream
    .map(row -> process(row))
    .peek(stat -> System.out.println("Hi, I processed another item"))
    .reduce(MyStat::aggregate);

I'd like a way to display overall progress, instead of printing a line per element (which results in thousands of lines per second, takes time and doesn't provide any useful information regarding overall progress). I would like to display something similar to:

 5% (08s)
10% (14s)
15% (20s)
...

What would be the best (and/or easiest) way to do this?

Yassin Hajaj
  • 21,337
  • 9
  • 51
  • 89
GnxR
  • 829
  • 1
  • 8
  • 20
  • Given the code you provided, it is not possible since there is no `size()` method (or equivilant) for `Stream`s. – Turing85 Jun 10 '18 at 16:04
  • 1
    You could create a MutableInteger class and use that to count how many you've peeked since you last printed and print every thousand. It's very anti-functional though – Richard Tingle Jun 10 '18 at 16:07
  • 2
    Do you know the number of elements in the stream before processing? If not, how do you estimate progress? – ernest_k Jun 10 '18 at 16:22
  • it's impossible if either the time of execution or the size of a stream isn't given. – Andrew Tobilko Jun 10 '18 at 16:22
  • Thank you for these answers. The question is perhaps more interesting if I say I want to display something once every 10k elements have been processed (as understood by @RichardTingle). Even though I have means to get the number of elements, I'm more interested in knowing how to display something once every N “iterations”. I can easily make it about percentage once I got there. – GnxR Jun 10 '18 at 16:39
  • 1
    You could note the starting time and only print if N seconds had elapsed, instead of every N items. I've done that before. That way you get a more consistent rate of output. You can even put the printing behind a class that knows about the time, N seconds, etc. so it doesn't complicate your main logic. – David Conrad Jun 11 '18 at 01:26
  • You might want to transform this from a stream to a for loop. If the place you're getting the Stream from can't easily be made to give you an Iterable, you can get an Iterator from the stream. – David Conrad Jun 11 '18 at 01:29
  • @DavidConrad The reason I'm not using an iterable is because there are lots of objects and storing them all in memory (which I believe is how Collections work) takes a very long time, eats up the RAM, and sometimes crashes the program. The base application used for loops, and I made it 5× faster by implementing streams. However, I don't know all of Java so if there are stream-like iterations (memory-wise), I'm interested. – GnxR Jun 11 '18 at 17:51
  • 1
    Iterables and Iterators don't have to have anything to do with Collections, and you can get an Iterator from any stream with [`Stream::iterator()`](https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#iterator--) – David Conrad Jun 11 '18 at 21:52

3 Answers3

19

First of all, Streams are not meant to achieve these kind of tasks (as opposed to a classic data structure). If you know already how many elements your stream will be processing you might go with the following option, which is, I repeat, not the goal of streams.

Stream<MyData> myStream = readData();
final AtomicInteger loader = new AtomicInteger();
int fivePercent = elementsCount / 20;
MyResult result = myStream
    .map(row -> process(row))
    .peek(stat -> {
        if (loader.incrementAndGet() % fivePercent == 0) {
            System.out.println(loader.get() + " elements on " + elementsCount + " treated");
            System.out.println((5*(loader.get() / fivePercent)) + "%");
        }
    })
    .reduce(MyStat::aggregate);
Yassin Hajaj
  • 21,337
  • 9
  • 51
  • 89
  • 7
    I understand streams are pure beings, but on the other hand progress indication sometimes comes in handy when dealing with massive amounts of data… :/ – GnxR Jun 10 '18 at 16:51
  • 3
    @GnxR You're right indeed, but I needed to point out the fact that it goes beyond the architecture purposes of stream and therefor we need a workaround. – Yassin Hajaj Jun 10 '18 at 17:03
  • In my case, I want to use streams to process data because the software built around stream processing can easily enable high redundancy/resiliency and huge throughput. However, the data in my different streams are related and do confer a finite set of elements. I guess I need a different data structure to store this relationship, and a good answer to this question might discuss approaches to doing that. – deed02392 Apr 29 '20 at 11:41
9

As others have pointed out: This has some caveats. First of all, streams are not supposed to be used for something like this.

On a more technical level, one could further argue:

  • A stream can be infinite
  • Even if you know the number of elements: This number might be distorted by operations like filter or flatMap
  • For a parallel stream, tracking the progress will enforce a synchronization point
  • If there is a terminal operation that is expensive (like the aggregation in your case), then the reported progress might not even sensibly reflect the computation time

However, keeping this in mind, one approach that might be reasonable for your application case is this:

You could create a Function<T,T> that is passed to a map of the stream. (At least, I'd prefer that over using peek on the stream, as suggested in another answer). This function could keep track of the progress, using an AtomicLong for counting the elements. In order to keep separate things separate, this progress could then be just forwarded to a Consumer<Long>, which will take care of the presentation

The "presentation" here refers to printing this progress to the console, normalized or as percentages, referring to a size that could be known wherever the consumer is created. But the consumer can then also take care of only printing, for example, every 10th element, or only print a message if at least 5 seconds have passed since the previous one.

import java.util.Iterator;
import java.util.Locale;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamProgress
{
    public static void main(String[] args)
    {
        int size = 250;
        Stream<Integer> stream = readData(size);

        LongConsumer progressConsumer = progress -> 
        {
            // "Filter" the output here: Report only every 10th element
            if (progress % 10 == 0)
            {
                double relative = (double) progress / (size - 1);
                double percent = relative * 100;
                System.out.printf(Locale.ENGLISH,
                    "Progress %8d, relative %2.5f, percent %3.2f\n",
                    progress, relative, percent);
            }
        };

        Integer result = stream
            .map(element -> process(element))
            .map(progressMapper(progressConsumer))
            .reduce(0, (a, b) -> a + b);

        System.out.println("result " + result);
    }

    private static <T> Function<T, T> progressMapper(
        LongConsumer progressConsumer)
    {
        AtomicLong counter = new AtomicLong(0);
        return t -> 
        {
            long n = counter.getAndIncrement();
            progressConsumer.accept(n);
            return t;
        };

    }

    private static Integer process(Integer element)
    {
        return element * 2;
    }

    private static Stream<Integer> readData(int size)
    {
        Iterator<Integer> iterator = new Iterator<Integer>()
        {
            int n = 0;
            @Override
            public Integer next()
            {
                try
                {
                    Thread.sleep(10);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                return n++;
            }

            @Override
            public boolean hasNext()
            {
                return n < size;
            }
        };
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(
                iterator, Spliterator.ORDERED), false);
    }
}
Marco13
  • 53,703
  • 9
  • 80
  • 159
  • Sonar suggests using `LongConsumer` rather than `Consumer` (but Sonar can be pretty fussy) – Andrew Spencer May 09 '19 at 07:13
  • 2
    @AndrewSpencer Not in this clase. `LongConsumer` actually makes sense here. It's only a minor difference, but I changed it accordingly. – Marco13 May 09 '19 at 09:47
1

The possibility of doing this highly depends on the type of source you are having in the stream. If you have a collection and you want to apply some operations on it you can do it because you know what is the size of the collection and you can keep a count of processed elements. But there is a caveat also in this case. If you will be doing parallel computations in the stream, this becomes more difficult as well.

In the cases where you are streaming data from outside the application it is very difficult that you can model the progress as you don't know when the stream will end.

NiVeR
  • 9,644
  • 4
  • 30
  • 35