4

Spring Boot 3 has changed context propagation in tracing. https://github.com/micrometer-metrics/tracing/wiki/Spring-Cloud-Sleuth-3.1-Migration-Guide#async-instrumentation

They deliver now library to this issue. I guess I don't quite understand how it works. I have created a taskExecutor as in guide.

@Bean(name = "taskExecutor")
    ThreadPoolTaskExecutor threadPoolTaskScheduler() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor() {
            @Override
            protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
                ExecutorService executorService = super.initializeExecutor(threadFactory, rejectedExecutionHandler);
                return ContextExecutorService.wrap(executorService, ContextSnapshot::captureAll);
            }
        };
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

And I have marked @Async like this:

 @Async("taskExecutor")
    public void run() {
        // invoke some service
    }

But context is not propagated to child context in taskExecutor thread.

user2590727
  • 441
  • 1
  • 6
  • 25

3 Answers3

6

You can autowire your ThreadPoolTaskExecutor and Context wrap in AsyncConfigurer.

import io.micrometer.context.ContextExecutorService;
import io.micrometer.context.ContextSnapshot;
import java.util.concurrent.Executor;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration(proxyBeanMethods = false)
@RequiredArgsConstructor
public class AsyncTraceContextConfig implements AsyncConfigurer {
  
  // NOTE: By design you can only have one AsyncConfigurer, thus only one executor pool is
  // configurable.
  @Qualifier("taskExecutor") // if you have more than one task executor pools
  private final ThreadPoolTaskExecutor taskExecutor;

  @Override
  public Executor getAsyncExecutor() {
    return ContextExecutorService.wrap(
        taskExecutor.getThreadPoolExecutor(), ContextSnapshot::captureAll);
  }
}

UPDATE

If you have more than one executor pools and wants to add tracing to all, use the TaskDecorator with ContextSnapshot.wrap():

import io.micrometer.context.ContextSnapshot;
import java.util.concurrent.Executor;
import org.springframework.boot.task.TaskExecutorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;

@Configuration
public class AsyncConfig {
  @Bean
  public TaskDecorator otelTaskDecorator() {
    return (runnable) -> ContextSnapshot.captureAll(new Object[0]).wrap(runnable);
  }

  @Bean("asyncExecutorPool1")
  public Executor asyncExecutorPool1(TaskDecorator otelTaskDecorator) {
    return new TaskExecutorBuilder()
        .corePoolSize(5)
        .maxPoolSize(10)
        .queueCapacity(10)
        .threadNamePrefix("threadPoolExecutor1-")
        .taskDecorator(otelTaskDecorator)
        .build();
  }

  @Bean("asyncExecutorPool2")
  public Executor asyncExecutorPool2(TaskDecorator otelTaskDecorator) {
    return new TaskExecutorBuilder()
        .corePoolSize(5)
        .maxPoolSize(10)
        .queueCapacity(10)
        .threadNamePrefix("threadPoolExecutor2-")
        .taskDecorator(otelTaskDecorator)
        .build();
  }
}

NOTE: You can follow this blog for more setup details and sample github project code.

Amith Kumar
  • 4,400
  • 1
  • 21
  • 28
0

I was facing the same problem. Pls add this code to the configuration and everything works as expected.

  @Configuration(proxyBeanMethods = false)
  static class AsyncConfig implements AsyncConfigurer, WebMvcConfigurer {

    @Override
    public Executor getAsyncExecutor() {
      return ContextExecutorService.wrap(Executors.newCachedThreadPool(), ContextSnapshot::captureAll);
    }

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
      configurer.setTaskExecutor(new SimpleAsyncTaskExecutor(r -> new Thread(ContextSnapshot.captureAll().wrap(r))));
    }
  }
Sesha
  • 565
  • 1
  • 7
  • 20
0

You can try this way, but I suspect it's a bug.

ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(){
            @Override
            protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
                ExecutorService executorService = super.initializeExecutor(threadFactory, rejectedExecutionHandler);
                return ContextExecutorService.wrap(executorService, ContextSnapshot::captureAll);
            }
            @Override
            public void execute(Runnable task) {
                super.execute(ContextSnapshot.captureAll().wrap(task));
            }

            @Override
            public Future<?> submit(Runnable task) {
                return super.submit(ContextSnapshot.captureAll().wrap(task));
            }

            @Override
            public <T> Future<T> submit(Callable<T> task) {
                return super.submit(ContextSnapshot.captureAll().wrap(task));
            }
        };
ethan
  • 1
  • 1