Integrating Caffeine async cache loader with reactive data source
Published:
This post assumes you know the basic concepts and usage of Project Reactor
I recently had a task to add Caffeine cache in a WebFlux service. Since everything is async in the service, it was natural to think of Caffeine’s async cache loader. It took me some time to get it work.
Here’s what I got to started with, as well as requirements:
- The data source of the cache is a reactive Feign client. Thus the api gives a
Mono
orFlux
. - Caffeine async cache loader works in the world of
CompletableFuture
. - The business logic level is all reactive.
So, you might already see the problem: How to do the transform of Mono/Flux -> CompletableFuture -> Mono/Flux while integrating with the async cache loader?
[BAD] Blocking?
At first, I thought “How hard it could be? I’ve used Caffeine cache loader quite a few times.” And I came up with the following code.
@Bean
public AsyncLoadingCache<UUID, List<DataObject>> asyncCache() {
return Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.maximumSize(500)
.buildAsync(key -> reactiveFeignClient.getDataById(key)
.blockOptional()
.orElse(List.of()));
}
Well, though it’s straightforward and easy to implement, it’s a really bad code because of the use of blockOptional
. It’s quite obvious that blocking is never the right thing to do in reactive world.
[GOOD] True async implementation
@Bean
public AsyncLoadingCache<UUID, List<DataObject>> asyncCache() {
return Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.maximumSize(500)
.buildAsync((key, executor) -> reactiveFeignClient.getDataById(key)
.subscribeOn(Schedulers.fromExecutor(executor))
.toFuture());
}
Instead of blocking the reactor, subscribe it on the cache loader’s own thread by subscribeOn(Schedulers.fromExecutor(executor))
.
On the business logic level, do the following to transform the cache back to Mono.
Mono.fromFuture(cache.get(key));
[EVEN BETTER] Abstracted cache layer
Let’s make the cache layer generic so that it can get plugged in anywhere.
public class AsyncCacheService<K, V> {
private final AsyncLoadingCache<K, V> cache;
/**
* Constructor. Build the cache loader.
*
* @param duration Caffeine expireAfterWrite value.
* @param fn Cache data loading function.
*/
public AsyncCacheService(Duration expireAfterWrite, Function<K, Mono<V>> fn) {
cache = Caffeine.newBuilder()
.expireAfterWrite(expireAfterWrite)
.buildAsync((key, executor) -> fn.apply(key)
.subscribeOn(Schedulers.fromExecutor(executor))
.toFuture());
}
public Mono<V> get(K k) {
return Mono.fromFuture(cache.get(k));
}
}