0

Here is a sample application I have written to explain the problem. Below is a rest end point that receives a request and on a different thread performs some operations. As you will see , the traceId is not copied over as soon as the execution goes over onto a new thread (see log extracts also below). What is the solution to this problem. I am looking for existing utilities etc where this can happen automatically. Example, in Spring : https://www.baeldung.com/spring-cloud-sleuth-single-application#4-spanning-runnables

GreetingResource.java

@Inject
    @Channel("words-in-write")
    Emitter<String> emitter;

    private final Executor executor = Executors.newFixedThreadPool(1);

    @GET
    @Path("{primeNumber}")
    @Produces(MediaType.TEXT_PLAIN)
    public String checkPrimeAndPublish(@PathParam("primeNumber") String primeNumber) {
        log.info("Received request : {}", primeNumber);
        var val = Uni.createFrom().item(primeNumber)
            .emitOn(executor)
            .onItem().transformToUni(s -> {
                log.info("Converting to Long : {}", s);
                return Uni.createFrom().item(Long.valueOf(s));
            })
            .onItem().transformToUni(aLong -> {
                log.info("Checking if prime : {}", aLong);
                return Uni.createFrom().item(isPrime(aLong));
            });

        val.invoke(isPrime -> {
            if(isPrime){
                log.info("Begin : prime provided {}, sending to kafka : {}", primeNumber, primeNumber + " is a prime");
                emitter.send(KafkaRecord.of(primeNumber, primeNumber + " is a prime"));
            } else {
                log.info("Begin : prime not provided {}, sending to kafka : {}", primeNumber, primeNumber + " is not a prime");
                emitter.send(KafkaRecord.of(primeNumber, primeNumber + " is not a prime"));
            }
        }).await().indefinitely();

        return "Done";
    }

Logs :

13:20:06 INFO  traceId=e80b6aff03ae74b1ee04a1601bfc2105, parentId=, spanId=8c0a6a423d8c1052 [or.ac.GreetingResource] (executor-thread-0) Received request : 7
13:20:06 INFO  traceId=, parentId=, spanId= [or.ac.GreetingResource] (pool-12-thread-1) Converting to Long : 7
13:20:06 INFO  traceId=, parentId=, spanId= [or.ac.GreetingResource] (pool-12-thread-1) Checking if prime : 7
13:20:06 INFO  traceId=, parentId=, spanId= [or.ac.GreetingResource] (pool-12-thread-1) Begin : prime provided 7, sending to kafka : 7 is a prime

Kiran K
  • 703
  • 4
  • 11
  • How do you think this could possibly work? If you're manually creating a thread pool that Quarkus knows nothing about and manually offloading tasks to that thread pool, you should also manually propagate the necessary context. – Ladicek Jul 10 '23 at 13:24
  • Whats the right way to do this kind of thing ? are there any ways of creating creating thread where this can happen automatically ? Example : In Spring there are utilities available to do this : https://www.baeldung.com/spring-cloud-sleuth-single-application#4-spanning-runnables – Kiran K Jul 10 '23 at 13:37

1 Answers1

2

The easiest way to achieve this is explained here : https://quarkus.io/guides/context-propagation

Following this I changed my build.gradle to include :

implementation 'io.quarkus:quarkus-smallrye-context-propagation'

as well as changed the Mutiny code to use the default worker thread pool.

.emitOn(Infrastructure.getDefaultWorkerPool())

And after this change the logs are as :

15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-0) Received request : 7
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-1) Converting to Long : 7
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-1) Checking if prime : 7
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-1) Begin : prime provided 7, sending to kafka : 7 is a prime
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [io.qu.ht.access-log] (executor-thread-0) "127.0.0.1 Host: localhost:9099

Kiran K
  • 703
  • 4
  • 11