4

I tried to use spring cloud stream with kafka binder. But when I called WebClient in chain, then trace id is lost.

My flow is 'external service' -> 'functionStream-in' -> 'http call' -> functionStream-out' -> 'testStream-in' -> 'testStream-out' -> 'external service'

But after http call(or not?) the trace id is not propagated and I don't understand why. If I remove http call, then everything is OK.

I tried to add Hooks.enableAutomaticContextPropagation();, but that didn't help. I tried to add ContextSnapshot.setThreadLocalsFrom around http call - same thing.

How can I solve it?

Dependencies:

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    
    implementation 'io.micrometer:micrometer-tracing-bridge-brave'
    implementation 'io.zipkin.reporter2:zipkin-reporter-brave'
    
    implementation "io.projectreactor:reactor-core:3.5.3"
    implementation "io.micrometer:context-propagation:1.0.2"
    implementation "io.micrometer:micrometer-core:1.10.4"
    implementation "io.micrometer:micrometer-tracing:1.0.2"
}

application.yml:

spring:
  cloud.stream:
    kafka.binder:
      enableObservation: true
      headers:
        - b3
    function.definition: functionStream;testStream
    default.producer.useNativeEncoding: true
    bindings:
      functionStream-in-0:
        destination: spring-in
        group: spring-test1
      functionStream-out-0:
        destination: test-in
      testStream-in-0:
        destination: test-in
        group: spring-test2
      testStream-out-0:
        destination: spring-out
  integration:
    management:
      observation-patterns: "*"
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

management:
  tracing:
    enabled: true
    sampling.probability: 1.0
    propagation.type: b3
logging.pattern.level: "%5p [%X{traceId:-},%X{spanId:-}]"

Code:

    @Bean
    WebClient webClient(final WebClient.Builder builder) {
        return builder.build();
    }

    @Bean
    Function<Flux<Message<String>>, Flux<Message<String>>> functionStream(final WebClient webClient, final ObservationRegistry registry) {
        return flux -> flux
                .<Message<String>>handle((msg, sink) -> {
                    log.info("functionStream-1");
                    sink.next(msg);
                })
                .flatMap(msg -> webClient.get()
                        .uri("http://localhost:8080/test")
                        .exchangeToMono(httpResponse -> httpResponse.bodyToMono(String.class)
                                .map(httpBody -> MessageBuilder.withPayload(httpBody)
                                        .copyHeaders(httpResponse.headers().asHttpHeaders())
                                        .build())
                                .<Message<String>>handle((m, sink) -> {
                                    log.info("functionStream-3");
                                    sink.next(m);
                                })
                        )
                )
                .handle((msg, sink) -> {
                    log.info("functionStream-2");
                    sink.next(msg);
                });
    }

    @Bean
    Function<Flux<Message<String>>, Flux<Message<String>>> testStream(final ObservationRegistry registry) {
        return flux -> flux
                .publishOn(Schedulers.boundedElastic())
                .<Message<String>>handle((msg, sink) -> {
                    log.info("testStream-1");
                    sink.next(msg);
                })
                .map(msg -> MessageBuilder
                        .withPayload(msg.getPayload())
                        .copyHeaders(msg.getHeaders())
                        .build());
    }

    @Bean
    RouterFunction<ServerResponse> router(final ObservationRegistry registry) {
        return route()
                .GET("/test", r -> ServerResponse.ok().body(Mono.deferContextual(contextView -> {
                    try (final var scope = ContextSnapshot.setThreadLocalsFrom(contextView, ObservationThreadLocalAccessor.KEY)) {
                        log.info("GET /test");
                    }
                    return Mono.just("answer");
                }), String.class))
                .build();
    }

With this code I have output:

2023-02-16T17:06:22.111  INFO [63ee385de15f1061dea076eb06b0d1e0,39a60588a695a702] 220348 --- [container-0-C-1] com.example.demo.TestApplication         : functionStream-1
2023-02-16T17:06:22.166  WARN [63ee385de15f1061dea076eb06b0d1e0,39a60588a695a702] 220348 --- [container-0-C-1] i.m.o.c.ObservationThreadLocalAccessor   : Scope from ObservationThreadLocalAccessor [null] is not the same as the one from ObservationRegistry [io.micrometer.observation.SimpleObservation$SimpleScope@523fe6a9]. You must have created additional scopes and forgotten to close them. Will close both of them
2023-02-16T17:06:22.170  WARN [63ee385de15f1061dea076eb06b0d1e0,de5d233d531b10f7] 220348 --- [container-0-C-1] i.m.o.c.ObservationThreadLocalAccessor   : Scope from ObservationThreadLocalAccessor [null] is not the same as the one from ObservationRegistry [io.micrometer.observation.SimpleObservation$SimpleScope@545339d8]. You must have created additional scopes and forgotten to close them. Will close both of them
2023-02-16T17:06:22.187  WARN [63ee385de15f1061dea076eb06b0d1e0,de5d233d531b10f7] 220348 --- [container-0-C-1] i.m.o.c.ObservationThreadLocalAccessor   : Scope from ObservationThreadLocalAccessor [null] is not the same as the one from ObservationRegistry [io.micrometer.observation.SimpleObservation$SimpleScope@44400bcc]. You must have created additional scopes and forgotten to close them. Will close both of them
2023-02-16T17:06:22.361  INFO [63ee385de15f1061dea076eb06b0d1e0,908f48f8485a4277] 220348 --- [ctor-http-nio-4] com.example.demo.TestApplication         : GET /test
2023-02-16T17:06:22.407  INFO [,] 220348 --- [ctor-http-nio-3] com.example.demo.TestApplication         : functionStream-3
2023-02-16T17:06:22.409  INFO [,] 220348 --- [ctor-http-nio-3] com.example.demo.TestApplication         : functionStream-2
2023-02-16T17:06:22.448  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,dd1b0fd86a6c39ca] 220348 --- [ctor-http-nio-3] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
2023-02-16T17:06:22.456  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,dd1b0fd86a6c39ca] 220348 --- [ctor-http-nio-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.3.2
2023-02-16T17:06:22.457  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,dd1b0fd86a6c39ca] 220348 --- [ctor-http-nio-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: b66af662e61082cb
2023-02-16T17:06:22.457  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,dd1b0fd86a6c39ca] 220348 --- [ctor-http-nio-3] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1676556382456
2023-02-16T17:06:22.477  INFO [,] 220348 --- [| adminclient-6] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for adminclient-6 unregistered
2023-02-16T17:06:22.481  INFO [,] 220348 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
2023-02-16T17:06:22.481  INFO [,] 220348 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-02-16T17:06:22.481  INFO [,] 220348 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
2023-02-16T17:06:22.512  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,b5babc6bef4e30ca] 220348 --- [oundedElastic-1] com.example.demo.TestApplication         : testStream-1
2023-02-16T17:06:22.539  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,30126c50752d5928] 220348 --- [oundedElastic-1] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
2023-02-16T17:06:22.543  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,30126c50752d5928] 220348 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.3.2
2023-02-16T17:06:22.544  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,30126c50752d5928] 220348 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: b66af662e61082cb
2023-02-16T17:06:22.544  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,30126c50752d5928] 220348 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1676556382543

Without http call I have output:

2023-02-16T17:03:09.518  INFO [63ee379d924e5645fc1d9e27b8135b48,9ad408700a3b5684] 204228 --- [container-0-C-1] com.example.demo.TestApplication         : functionStream-1
2023-02-16T17:03:09.518  INFO [63ee379d924e5645fc1d9e27b8135b48,9ad408700a3b5684] 204228 --- [container-0-C-1] com.example.demo.TestApplication         : functionStream-2
2023-02-16T17:03:09.615  INFO [63ee379d924e5645fc1d9e27b8135b48,3d4c6bd14a3ca4b6] 204228 --- [container-0-C-1] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
2023-02-16T17:03:09.629  INFO [63ee379d924e5645fc1d9e27b8135b48,3d4c6bd14a3ca4b6] 204228 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.3.2
2023-02-16T17:03:09.629  INFO [63ee379d924e5645fc1d9e27b8135b48,3d4c6bd14a3ca4b6] 204228 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: b66af662e61082cb
2023-02-16T17:03:09.629  INFO [63ee379d924e5645fc1d9e27b8135b48,3d4c6bd14a3ca4b6] 204228 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1676556189628
2023-02-16T17:03:09.691  INFO [,] 204228 --- [| adminclient-6] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for adminclient-6 unregistered
2023-02-16T17:03:09.693  INFO [,] 204228 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
2023-02-16T17:03:09.693  INFO [,] 204228 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-02-16T17:03:09.693  INFO [,] 204228 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
2023-02-16T17:03:09.859  INFO [63ee379d924e5645fc1d9e27b8135b48,b92a1a59ffd32d80] 204228 --- [oundedElastic-1] com.example.demo.TestApplication         : testStream-1
2023-02-16T17:03:09.868  INFO [63ee379d924e5645fc1d9e27b8135b48,db97f5eed98602f6] 204228 --- [oundedElastic-1] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
2023-02-16T17:03:09.874  INFO [63ee379d924e5645fc1d9e27b8135b48,db97f5eed98602f6] 204228 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.3.2
2023-02-16T17:03:09.874  INFO [63ee379d924e5645fc1d9e27b8135b48,db97f5eed98602f6] 204228 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: b66af662e61082cb
2023-02-16T17:03:09.874  INFO [63ee379d924e5645fc1d9e27b8135b48,db97f5eed98602f6] 204228 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1676556189874
Roman
  • 115
  • 2
  • 10

0 Answers0