2

I am trying to create a Caffeine cache for my spring webflux application.

Here is the implementation:

private final Cache<String, OfferData> localCache = Caffeine.newBuilder()
            .expireAfterWrite(1, TimeUnit.MINUTES)
            .initialCapacity(1)
            .removalListener((RemovalListener<String, OfferData>) (key, value, cause) -> {
                log.debug(">> Caffeine Cache => Key: " + key + " | Value:" + value + " | Cause:" + cause);
            })
            .recordStats()
            .build();

The caveat here is that I cannot use the refreshAfterWrite property here. It is only applicable for Loading Cache and I cannot use the Loading Cache becuase the loader function returns a Mono.

Loader function below:

@Retryable(value = {RuntimeException.class}, maxAttempts = 2, backoff = @Backoff(delayExpression = "${service.offer.local-cache-load-retry-delay}"))
    private Mono<OfferData> loadLocalCache() {
        log.info(LOG_LOAD_LOCAL_CACHE + "Re-loading local offer cache...");
        log.info(LOG_LOAD_LOCAL_CACHE + "Fetching offers from external cache...");

        return fetchOffersFromExternalCache()
                .doOnNext(offerData -> log.info(LOG_LOAD_LOCAL_CACHE + "Ads fetched successfully from external cache."))
                .onErrorResume(throwable -> {
                    // Failed to load from the external cache. No data present in the cache.
                    // Loading the data from Ads Manager to the cache
                    log.error(LOG_LOAD_LOCAL_CACHE + "No offer data could be fetched from external cache: " + throwable);
                    log.info(LOG_LOAD_LOCAL_CACHE + "Fetching offers from source...");

                    // If data cannot be retrieved from the source, we do not populate the internal cache with anything
                    // for that key. We leave it blank for the next call to try and populate it.
                    return fetchOffersFromSource()
                            .doOnNext(offerData -> storeOffersInExternalCache(offerData, offerServiceProperties.getExternalCacheExpiration()))
                            .doOnError(sourceThrowable -> log.error(LOG_LOAD_LOCAL_CACHE + "Offer data not available at source: {}", sourceThrowable.getMessage()));
                });
    }

My implementation to extract the value from Mono and then store it in the cache:

public Mono<List<RuntimeOfferDetails>> getOffers(String eventName) {
        return CacheMono.lookup(
                        k -> Mono.justOrEmpty(localCache.getIfPresent(k)).map(Signal::next),
                        OFFER_DATA_KEY
                ).onCacheMissResume(this::loadLocalCache)
                .andWriteWith((k, sig) -> Mono.fromRunnable(() -> localCache.put(k, Objects.requireNonNull(sig.get()))))
                .map(offerData -> getOfferDetails(offerData, eventName))
                .map(offerDetailsList -> offerDetailsList.stream()
                        .map(RuntimeOfferDetails::toRuntimeOfferDetails)
                        .collect(Collectors.toList())
                ).doOnError(throwable -> {
                    log.error(LOG_GET_OFFERS + "No offer data found in the local cache neither could it be retrieved from external cache or source: {}", throwable.getMessage());
                    meterRegistry.counter(METRIC_FETCH_RUNTIME_OFFER_DETAILS, TAG_OUTCOME, FAILURE).increment();
                }).doOnSuccess(runtimeOfferDetails -> {
                    log.info(LOG_GET_OFFERS + "Runtime offer details retrieved successfully.");
                    log.debug(LOG_GET_OFFERS + "Runtime offer details: [{}]", runtimeOfferDetails);
                    meterRegistry.counter(METRIC_FETCH_RUNTIME_OFFER_DETAILS, TAG_OUTCOME, SUCCESS).increment();
                }).switchIfEmpty(Mono.error(new OfferServiceException("No offer data found for the event: " + eventName)));
    }

I can control the cache by using the expireAfterWrite property but again, it will just evict the keys from the in-memory cache. I want the same control that I had with refreshAfterWrite but for a cache other than Loading cache. How can I implement this?

  • 1
    Use the future formulation for Caffeine; you can readily convert a `Mono` to a `Future`. https://github.com/ben-manes/caffeine/wiki/Population#asynchronously-loading – Boris the Spider Jan 29 '22 at 06:25
  • @BoristheSpider But the problem here is, if I create the cache with >, this does not work as well. It creates a new publisher every time the cache is called. I have to extract the value and then store it in the cache. I have added the code where I am actually calling the loader function and then extracting the value to save it in the cache. – Gurucharan Sharma Jan 29 '22 at 06:47
  • My comment was in response to “_I cannot use the Loading Cache becuase the loader function returns a Mono._”. Just use an `AsyncLoadingCache`. – Boris the Spider Jan 29 '22 at 06:49
  • Similarly “_creates a new publisher every time the cache is called_” - see `Mono.cache`. – Boris the Spider Jan 29 '22 at 06:51
  • I read on one of the threads on an on-going issue: _Based on what I read in #254 it sounds the same. Which means that the AsyncLoadingCache cannot be used with reactor._ Details here: https://github.com/ben-manes/caffeine/issues/260 – Gurucharan Sharma Jan 29 '22 at 10:13
  • @BoristheSpider is correct and it is fully compatible with Reactor (and their [recommended approach](https://github.com/reactor/reactor-addons/issues/237)). There is nothing wrong in #254 / #260 except mistaken expectations. In asynchronous, concurrent, shared state data you have to expect that a failure cannot be instantaneously be removed and will be visible as a decoupled execution. Otherwise you need to block on the computation within the hashmap, like the synchronous cache does. Those are lack of familiarity as concurrent programming is a harder subject. – Ben Manes Jan 29 '22 at 18:07
  • I need some more help. I am not sure - first, about how can I convert Mono to a Future. The problem here is that the function that I am supposed to call to load the cache returns a Mono. If I convert it into a Completable Future, it will return a Future. Now getting the data from the Future is again a blocking call, right? Moreover, Spring Webflux and Completable Future are two different interfaces/implementations. How to combine the two in the same flow? – Gurucharan Sharma Jan 29 '22 at 19:45
  • I am really sorry if I am confusing things. But it would be really good if you could just share some resources for me to read and research. – Gurucharan Sharma Jan 29 '22 at 19:45
  • @BoristheSpider the link that you shared in the first comment contains no reference to _you are readily concert a Mono to Future_. Could you share some that could help me?? – Gurucharan Sharma Jan 29 '22 at 19:46
  • As already linked by @benmanes above, this is the recommended approach by Rector themselves. https://github.com/reactor/reactor-addons/issues/237 – Boris the Spider Jan 29 '22 at 19:53
  • @GurucharanSharma Please see `Mono.toFuture()` and `Mono.fromFuture(f)`. – Ben Manes Jan 29 '22 at 19:55
  • 1
    Thank you so much. Really appreciate the help. :) – Gurucharan Sharma Jan 29 '22 at 19:56
  • @BenManes / Boris, would you please take a look at the document - https://docs.google.com/document/d/1r5qDkbPCtdTtRzKR8QgKrGhy45sfrGDUGYh9RiH0WZw/edit?usp=sharing. I tried the solutions that were suggested on this forum yesterday but I am still facing some issues. Need to understand what am I doing wrong here. Thank you. – Gurucharan Sharma Jan 31 '22 at 06:36
  • @GurucharanSharma You are using `getIfPresent(key)` which is a lookup without invoking the loader if absent. You want `get(key)` to load on a miss, which will invoke your function. – Ben Manes Jan 31 '22 at 06:38
  • 1
    Thanks a lot, Ben. It is now working as expected. I really appreciate your support. Thank you so much. :) – Gurucharan Sharma Jan 31 '22 at 06:52
  • @GurucharanSharma where is the complete working solution? – Dexter Legaspi Jul 14 '23 at 23:35

0 Answers0