28

I'm learning Spring WebFlux and during writing a sample application I found a concern related to Reactive types (Mono/Flux) combined with Spring Cache.

Consider the following code-snippet (in Kotlin):

@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Cacheable("tasks")
    fun get(id: String): Mono<Task> = taskRepository.findById(id)
}

Is this valid and safe way of caching method calls returning Mono or Flux? Maybe there are some other principles to do this?

The following code is working with SimpleCacheResolver but by default fails with Redis because of the fact that Mono is not Serializable. In order to make them work e.g Kryo serializer needs to be used.

Tomek Zaremba
  • 353
  • 1
  • 4
  • 7

3 Answers3

44

Hack way

For now, there is no fluent integration of @Cacheable with Reactor 3. However, you may bypass that thing by adding .cache() operator to returned Mono

@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Cacheable("tasks")
    fun get(id: String): Mono<Task> = taskRepository.findById(id).cache()
}

That hack cache and share returned from taskRepository data. In turn, spring cacheable will cache a reference of returned Mono and then, will return that reference. In other words, it is a cache of mono which holds the cache :).

Reactor Addons Way

There is an addition to Reactor 3 which allows fluent integration with modern in-memory caches like caffeine, jcache, etc. Using that technique you will be capable to cache your data easily:

@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Autowire
    CacheManager manager;


    fun get(id: String): Mono<Task> = CacheMono.lookup(reader(), id)
                                               .onCacheMissResume(() -> taskRepository.findById(id))
                                               .andWriteWith(writer());

    fun reader(): CacheMono.MonoCacheReader<String, Task> = key -> Mono.<Signal<Task>>justOrEmpty((Signal) manager.getCache("tasks").get(key).get())
    fun writer(): CacheMono.MonoCacheWriter<String, Task> = (key, value) -> Mono.fromRunnable(() -> manager.getCache("tasks").put(key, value));
} 

Note: Reactor addons caching own abstraction which is Signal<T>, so, do not worry about that and following that convention

Oleh Dokuka
  • 11,613
  • 5
  • 40
  • 65
  • 1
    Thanks for valuable hints but the question still stands: is serializing and caching Mono object itself something risky or considered as bad practice? I would like to use @Cacheable in combination with Redis to move cache outside of the application memory. – Tomek Zaremba Jan 08 '18 at 19:35
  • Unfortunately, the better way is integrating with Redis manually, over the second approach with combination, in your case, using Spring Data Redis – Oleh Dokuka Jan 08 '18 at 19:42
  • 3
    the "reactor addons way" above needs to be integrated to `@Cacheable` at some point in the future to cache the result held by the `Mono`. Caching a `Mono` instance itself doesn't make sense, no more than trying to cache a plain `Runnable` or a `Future` – Simon Baslé Jan 08 '18 at 19:45
  • I have question about Reactor Addons way. What about multithreading? If we have 2 threads that want to get data from cache and cache is empty - we will get 2 calls to db? – SoulCub Mar 04 '19 at 08:54
  • 2
    @SoulCub at a time there is no additional synchronization between callers, thus it may happen that two calls to DB may appear. Therefore you have to add additional calls multiplexing in order to avoid racing. I will add sample to the answer – Oleh Dokuka Mar 04 '19 at 09:41
  • @OlehDokuka I guess since blocking CacheManager operations are wrapped in Mono, we should be careful on which Scheduler this operates (eg. elastic()), right ? – Christophe Bornet Jul 03 '19 at 05:44
  • @ChristopheBornet that is true. It is recommended to user non-blocking caches if possible (e.g. https://lettuce.io/) or take additional care of running a task on dedicated Schedule – Oleh Dokuka Jul 08 '19 at 20:11
  • 1
    Do you know if the `@Cacheable` and `.cache()` solution leaks memory? @Ilker below recommends using `.cache(ttl)` with a ttl ≥ that of the cache configuration, if I understand correctly. Do you know if that is needed? – Tobia Oct 18 '19 at 10:10
  • @OlehDokuka, is there any simple hack for Kotlin coroutines(suspend functions)? – Eugene Maksymets Mar 19 '21 at 12:17
  • for the first approach i have found to also need the annotation @CacheConfig(cacheNames = ....) above the class so the cache actually updates the previous value after getting the result when configured with a timet to live, thanks for the helpfull hack – Sebastian Zapata Sep 17 '21 at 18:59
  • 1
    There is serious flaw in Hack way. If there is error in Mono, in this case exception is thrown inside taskRepository.findById(id), it will be cached and emited for as long as cache is valid. – Piotr May 29 '22 at 18:54
  • @OlehDokuka with the Hack Way solution I obtain "DefaultSerializer requires a Serializable payload but received an object of type [reactor.core.publisher.MonoCacheTime]" (project reactor), maybe i'm missing something ? – obe6 Feb 14 '23 at 11:44
  • And of course this does not work if you use Redis, because Mono is not serializable – Ilya Lisov Apr 04 '23 at 13:43
3

I have used Oleh Dokuka's hacky solution worked great but there is a catch. You must use a greater Duration in Flux cache than your Cachable caches timetolive value. If you dont use a duration for Flux cache it wont invalidate it (Flux documentation says "Turn this Flux into a hot source and cache last emitted signals for further Subscriber."). So making Flux cache 2 minutes and timetolive 30 seconds can be valid configuration. If ehcahce timeout occurs first, than a new Flux cache reference is generated and it will be used.

  • 3
    Are you saying that if I use `@Cacheable` and `.cache()` it leaks memory? Do I need to explicitly call `.cache(ttl)` with a ttl ≥ that of the cache configuration? – Tobia Oct 18 '19 at 10:08
-2

// In a Facade:

public Mono<HybrisResponse> getProducts(HybrisRequest request) {
    return Mono.just(HybrisResponse.builder().build());
}

// In a service layer:

@Cacheable(cacheNames = "embarkations")
public HybrisResponse cacheable(HybrisRequest request) {
    LOGGER.info("executing cacheable");
    return null;
}

@CachePut(cacheNames = "embarkations")
public HybrisResponse cachePut(HybrisRequest request) {
    LOGGER.info("executing cachePut");
    return hybrisFacade.getProducts(request).block();
}

// In a Controller:

HybrisResponse hybrisResponse = null;

try {
   // get from cache
   hybrisResponse = productFeederService.cacheable(request);

} catch (Throwable e) {
   // if not in cache then cache it
   hybrisResponse = productFeederService.cachePut(request);
}

return Mono.just(hybrisResponse)
    .map(result -> ResponseBody.<HybrisResponse>builder()
        .payload(result).build())
    .map(ResponseEntity::ok);
Developer
  • 15
  • 4