0

Hello I have here an async controller and inside a controller which also calls an async method to retrieve user from an API. I wanted to access the userToken which can be found in the requestAttributes where I will be using it to decrypt the encryptedId. Following the answer from here, I was able to access the user token from ThreadLocalData however when I try to make my controller asynchronous this is where it no longer works.

MyController.java

@Async("taskExecutor")
@GetMapping(value = "/users")
public Callable<ResponseEntity<GetUserResponse>> getCashFlowTotals(
        final UserToken accessToken,
        @RequestParam MultiValueMap<String,String> params
) throws ExecutionException, InterruptedException, JsonProcessingException {
    
    List<CompletableFuture<GetUserResponse>> futures = new ArrayList<>();
    params.get("encryptedIds").forEach(param -> {
        futures.add(userService.getUserDetails(param));
    });

    CompletableFuture<List<GetUserResponse>> userCompletableFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(v -> {
        return futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
    });

    return () -> ResponseEntity.ok(GetUserResponse.builder().user(totalsCompletableFuture.join()).build());
}

UserService.java

@Async("taskExecutor")
public GetUserResponse getUserDetails(String encryptedIds) {
    Request request = requestFactory.createUserRequest(encryptedIds);

    //Retrieve user from an API

    UserToken userToken = ThreadLocalData.getAccessToken(); // This is where it returns empty.

    return this.formatResponse(userList, encryptedIds);
}

ThreadLocalData.java

public class ThreadLocalData {
private ThreadLocalData() {}

private static final ThreadLocal<Map<String, Object>> ctx = ThreadLocal.withInitial(
        HashMap::new
);

public static Map<String, Object> getContext() {
    return ctx.get();
}

public static void setContext(Map<String, Object> map) {
    ctx.set(map);
}

public static void removeContext() {
    ctx.remove();
}

public static Optional<AccessToken> getUserToken() {
    if (ctx.get().containsKey("__access_token__")) {
        return Optional.ofNullable((UserToken) ctx.get().get("__access_token__"));
    }

    return Optional.empty();
  }

}

Configuration.java

@Bean(name = "taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setTaskDecorator(runnable -> {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        Map<String, Object> map;

        if (requestAttributes != null) {
            map = Arrays.stream(requestAttributes.getAttributeNames(0))
                            .collect(Collectors.toMap(
                                    r -> r,
                                    r -> requestAttributes.getAttribute(r, 0)
                            ));
        } else {
            map = new HashMap<>();
        }

        return () -> {
                try {
                    ThreadLocalData.setContext(map);
                    runnable.run();
                } finally {
                   ThreadLocalData.removeContext()
                }

        };
    });
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(15);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("asyncThread-");
    executor.initialize();

    return executor;
}

Does anyone here have an idea on what did I miss or how could I get it working?

blitzen12
  • 1,350
  • 3
  • 24
  • 33
  • Well the name is a give-away it is a `ThreadLocal` it is bound to a thread, if you start/use another thread it doesn't have access to it anymore because it is a different thread. A `ThreadLocal` is basically a `Map` with the `Thread` as the key. The problem is your completable futures those already run on a thread before accessig the request hence there is nothing. Instead let the `@Async` method return a `CompletableFuture` and use those instead. – M. Deinum Jul 26 '23 at 06:32

0 Answers0