3

Given some code using streams to process a large number of items, what's the best way to instrument the various steps for logging and performance/profiling?

Actual example:

  ReactiveSeq.fromStream(pairs)
                .filter(this::satisfiesThreshold)
                .filter(this::satisfiesPersistConditions)
                .map((pair) -> convertToResult(pair, jobId))
                .flatMap(Option::toJavaStream)
                .grouped(CHUNK_SIZE)
                .forEach((chunk) ->
                {
                    repository.save(chunk);
                    incrementAndReport();
                });
  reportProcessingTime();

Logging progress is important so I can trigger progress events in another thread that update a user interface.

Tracking the performance characteristics of the filtering and mapping steps in this stream is desireable to see where optimizations can be made to speed it up.

I see three options:

  1. put logging/profiling code in each function
  2. use peek around each step without actually using the value
  3. some sort of annotation based or AOP solution (no idea what)

Which is the best? Any ideas on what #3 would look like? Is there another solution?

Casey
  • 6,166
  • 3
  • 35
  • 42
  • 2
    interesting question, but all the intermediate operations don't *actually* do anything until the terminal one is hit. And elements are processed each at a time through the entire pipeline of operations. So measuring with peek (like a Start/StopWatch) would actually measure only a single element performance. – Eugene Feb 16 '17 at 09:58
  • 2
    More than often, predicates and mapping functions are way too cheap to get a meaningful result when being instrumented, regardless of which of the three options you use. The added code will be much more expensive than the actual operation. Use a sampling profiler to identify hot spots instead. – Holger Feb 16 '17 at 10:05

1 Answers1

2

You have a couple of options here (if I have understood correctly) :-

  1. We can make use of the elapsed operator to track the elapsed time between element emissions e.g.

      ReactiveSeq.fromStream(Stream.of(1,2))
                 .filter(this::include)
                 .elapsed()
                 .map(this::logAndUnwrap)
    
      Long[] filterTimeTakenMillis = new Long[maxSize];
      int filterIndex = 0;
      private <T> T logAndUnwrap(Tuple2<T, Long> t) {
          //capture the elapsed time (t.v2) and then unwrap the tuple
          filterTimeTakenMillis[filterIndex++]=t.v2;
          return t.v1;
      }
    

This will only work on cyclops-react Streams.

  1. We can make use of the AOP-like functionality in FluentFunctions

e.g.

ReactiveSeq.fromStream(Stream.of(1,2))
                .filter(this::include)
                .elapsed()
                .map(this::logAndUnwrap)
                .map(FluentFunctions.of(this::convertToResult)
                                   .around(a->{

                                    SimpleTimer timer = new SimpleTimer();
                                    String r = a.proceed();
                                    mapTimeTakenNanos[mapIndex++]=timer.getElapsedNanos();
                                    return r;
                }));

This will also work on vanilla Java 8 Streams.

John McClean
  • 5,225
  • 1
  • 22
  • 30
  • It would be great to push elapsed times into some (preallocated?) stats buffer instead of stdout. It would be faster (so less distortion of the performance figures), and would simplify later analysis (histograms, etc). – 9000 Feb 16 '17 at 16:48
  • True - I'll update the examples to reflect when I get a chance. – John McClean Feb 16 '17 at 16:54
  • Thanks! The FluentFunctions sample will work well, I can create a few basic logging functions to wrap the conversion functions. Pushing the stats info into a buffer as 9000 said let's my emit progress events to keep the user informed of log running jobs. In this case the performance impact isn't so important compared to the requirement of emitting the progress updates. – Casey Feb 16 '17 at 19:56