3

I am trying to get execution time for reading from redis in reactive programming, on looking up docs I am able to see that elapsed() method will does the same and implemented code as below.

Flux.fromIterable(getActions(httpHeaders))
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(actionFact -> methodToReadFromCache(actionFact))
                .sequential();

public Mono<ActionFact> methodToReadFromCache(actionFact) {
    return Mono.fromCallable(() -> getKey(actionFact))
                .flatMap(cacheKey ->
                  redisOperations.hasKey(key)
                                .flatMap(aBoolean -> {
                                    if (aBoolean) {
                                        return redisOperations.opsForValue().get(cacheKey);
                                    }
                                    return authzService.getRolePermissions(actionFact)
                                            .flatMap(policySetResponse ->
                                                    //save in cache
                                            );
                                })
                                .elapsed()
                                .flatMap(lambda -> {
                                    LOG.info("cache/service processing key:{}, time:{}", key, lambda.getT1());
                                    return Mono.just(lambda.getT2());
                                });

Output:

cache/service processing key:KEY1, time:3 
cache/service processing key:KEY2, time:4 
cache/service processing key:KEY3, time:18 
cache/service processing key:KEY4, time:34 
cache/service processing key:KEY5, time:46 
cache/service processing key:KEY6, time:57 
cache/service processing key:KEY7, time:70 
cache/service processing key:KEY8, time:81 
cache/service processing key:KEY9, time:91 
cache/service processing key:KEY10, time:103
cache/service processing key:KEY11, time:112
cache/service processing key:KEY12, time:121
cache/service processing key:KEY13, time:134
cache/service processing key:KEY14, time:146
cache/service processing key:KEY15, time:159

I am expecting that time taken for each of the cache request will be <5 milliseconds like first and second request but not the case. Does elapsed() add current fetching time to the cummulative? As per my understanding each item emmitted from flux is independent?

akreddy.21
  • 626
  • 3
  • 8
  • 21

2 Answers2

2

Mono#elapsed measures the time between when the Mono is subscribed to and the moment the Mono emits an item (onNext).

What causes the subscription and the start of the timer, in your case, is the outer parallelized flatMap that calls methodToReadFromCache.

What causes the onNext and thus what is timed is the combination of hasKey and the if/else part (redisOperations.opsForValue().get(cacheKey) vs authzService).

The outer flatMap should at least as many timers as there are CPUs, since we're in parallel mode.

But the fact that the timings are skewed could hint at the fact that something is either blocking or has limited capacity. For example, could it be that the redisTemplate can only process a few keys at a time?

Simon Baslé
  • 27,105
  • 5
  • 69
  • 70
  • i'm not sure about his code example though notice that his `methodToReadFromCache` method returns void, but he has a return statement in it. The code example is a bit off. – Toerktumlare Jul 12 '19 at 13:16
  • Thanks @Simon Basie, could you please explain this 'The outer flatMap should at least as many timers as there are CPUs, since we're in parallel mode' ? Btw, I am using reactiveRedisTemplate so it shouldn't be blocking – akreddy.21 Jul 15 '19 at 08:47
  • @ThomasAndolf updated the return type, I copied from notePad so missed it. – akreddy.21 Jul 15 '19 at 08:48
  • @Simon, I observed one more behaviour like even though running parallely, while reading from cache only one thread being used but when I turn off cache, I can see that multiple thread being in use to execute service calls. As Redis is single-threaded server, webflux not able to use mutliple threads? – akreddy.21 Jul 15 '19 at 08:55
  • @akreddy.21 what I mean by that is that you use `parallel().runOn(Schedulers.parallel())` so it should divide the work between all the CPUs. Say you have 8 CPUs, this should have resulted in 8 timed Monos in parallel, so 8 `elapsed` durations that should have been equal or close. how does it behave if you replace the scheduler with `Schedulers.newParallel()`? what if you do a `publishOn(Schedulers.newParallel())` just before the last `flatMap` (the one that logs)? – Simon Baslé Jul 15 '19 at 08:59
  • what is weird is that A) the latencies seem a bit high for a test made on local Redis and B) the order isn't mixed up, which I would expect in a parallelized `flatMap`. is this snippet truly representative of what you are doing to obtain these logs? – Simon Baslé Jul 15 '19 at 09:21
  • @SimonBaslé yes exactly same thing except key construction from parameters, I am also seeing the same pattern not mixed up, only single thread is being used for redis connection. – akreddy.21 Jul 15 '19 at 10:17
0

as per the documentation

I want to associate emissions with a timing (Tuple2<Long, T>) measured…​

  • since subscription: elapsed

  • since the dawn of time (well, computer time): timestamp

elapsed is measured time since subscription. So you subscribe and it starts emitting, the time will increase the longer since you subscribed to your service.

official docs

Community
  • 1
  • 1
Toerktumlare
  • 12,548
  • 3
  • 35
  • 54
  • 1
    note that for `Flux` this part of the reference guide is a bit misleading. only the first `onNext` is measured against subscription time. following `onNext` timings are between N and N-1. But here we're dealing with a `Mono` so that holds true. – Simon Baslé Jul 12 '19 at 12:44