Hello I have here an async
controller and inside a controller which also calls an async
method to retrieve user from an API. I wanted to access the userToken which can be found in the requestAttributes where I will be using it to decrypt the encryptedId. Following the answer from here, I was able to access the user token from ThreadLocalData
however when I try to make my controller asynchronous this is where it no longer works.
MyController.java
@Async("taskExecutor")
@GetMapping(value = "/users")
public Callable<ResponseEntity<GetUserResponse>> getCashFlowTotals(
final UserToken accessToken,
@RequestParam MultiValueMap<String,String> params
) throws ExecutionException, InterruptedException, JsonProcessingException {
List<CompletableFuture<GetUserResponse>> futures = new ArrayList<>();
params.get("encryptedIds").forEach(param -> {
futures.add(userService.getUserDetails(param));
});
CompletableFuture<List<GetUserResponse>> userCompletableFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(v -> {
return futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
});
return () -> ResponseEntity.ok(GetUserResponse.builder().user(totalsCompletableFuture.join()).build());
}
UserService.java
@Async("taskExecutor")
public GetUserResponse getUserDetails(String encryptedIds) {
Request request = requestFactory.createUserRequest(encryptedIds);
//Retrieve user from an API
UserToken userToken = ThreadLocalData.getAccessToken(); // This is where it returns empty.
return this.formatResponse(userList, encryptedIds);
}
ThreadLocalData.java
public class ThreadLocalData {
private ThreadLocalData() {}
private static final ThreadLocal<Map<String, Object>> ctx = ThreadLocal.withInitial(
HashMap::new
);
public static Map<String, Object> getContext() {
return ctx.get();
}
public static void setContext(Map<String, Object> map) {
ctx.set(map);
}
public static void removeContext() {
ctx.remove();
}
public static Optional<AccessToken> getUserToken() {
if (ctx.get().containsKey("__access_token__")) {
return Optional.ofNullable((UserToken) ctx.get().get("__access_token__"));
}
return Optional.empty();
}
}
Configuration.java
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(runnable -> {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
Map<String, Object> map;
if (requestAttributes != null) {
map = Arrays.stream(requestAttributes.getAttributeNames(0))
.collect(Collectors.toMap(
r -> r,
r -> requestAttributes.getAttribute(r, 0)
));
} else {
map = new HashMap<>();
}
return () -> {
try {
ThreadLocalData.setContext(map);
runnable.run();
} finally {
ThreadLocalData.removeContext()
}
};
});
executor.setCorePoolSize(10);
executor.setMaxPoolSize(15);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("asyncThread-");
executor.initialize();
return executor;
}
Does anyone here have an idea on what did I miss or how could I get it working?