2

I am in the middle of migrating code from Spring Boot 2.7.7/Spring Cloud 2021.0.5 to Spring Boot 3.0.1/Spring Cloud 2022.0.0. As part of this migration, I now am using io.micrometer:micrometer-tracing-bridge-otel. While I have my code functionally working, I have noticed that the traceId is no longer propagated across HTTP REST or Kinesis messaging service boundaries (i.e., my HTTP REST client/server and Kinesis messaging producer/consumer microservices log separate traceIds). Is there an additional dependency or dependencies I need to add to my projects to ensure that the traceId gets propagated across service boundaries?

As FYI, with Spring Boot 2.7.7/Spring Cloud 2021.0.5, I was able to propagate the traceId across Kinesis producers and consumers with the following configuration (no separate configuration was needed to propagate the traceId across HTTP REST boundaries) - adding the b3 header was the key to getting traceId propagation working:

spring:
  cloud:
    stream:
      bindings:
        myEvent-out-0:
          content-type: application/*+avro
          destination: my-event
      kinesis:
        binder:
          auto-create-stream: true
          headers:
            - b3
          kpl-kcl-enabled: true

I noticed here it states that "by default we don't support joined spans (this means that when you have e.g. an HTTP span, you will no longer see the same span being there on the client and sender side, you will see two separate spans now)." So, when it states that by default it's not supported, does this mean that this is an optional configuration? This statement confuses me, and I'm wondering why the decision was to not join spans as the default auto-configuration when this is clearly what is needed for log correlation across a distributed architecture.

So, in summary, I am seeking guidance regarding how to configure Micrometer Tracing so that traceIds are always propagated across service boundaries, whether they are HTTP REST or message boundaries. What is the bare minimum configuration required to get this working?

UPDATE

Through extensive testing, I figured out that if you declare a RestTemplateBuilder, this causes traceId propagation to not work across HTTP REST boundaries. So, if you have a bean defined like the following and you have run across this post because your traceIds are not propagating correctly, REMOVE IT!!!

@Bean
public RestTemplateBuilder restTemplateBuilder() {
  return new RestTemplateBuilder();
}

UPDATE 2

I have added the following bean to my producer microservice in an attempt to add the traceparent header since it doesn't get added automatically yet with a StreamBridge configuration:

@Bean
@GlobalChannelInterceptor(patterns = {"*-out-0"})
public ChannelInterceptor customInterceptor(Tracer tracer) {
  return new ChannelInterceptor() {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
      var payload = message.getPayload();
      var headers = message.getHeaders();
      return MessageBuilder.withPayload(payload)
          .setHeader("traceparent", tracer.currentSpan().context().traceId())
          .copyHeaders(headers)
          .build();
    }
  };
}

With this bean added, I see the message header arriving at the consumer microservice (I log the received headers out), but the trace id in my consumer microservice's logs does not match what is received via the traceparent header.

UPDATE 3

Is the following bean what I should implement on the consumer side for *-in-0 patterns?

@Bean
@GlobalChannelInterceptor(patterns = {"*-in-0"})
public ChannelInterceptor customInterceptor(Tracer tracer, ObservationPropagationChannelInterceptor observationPropagationChannelInterceptor) {
  return new ChannelInterceptor() {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
      observationPropagationChannelInterceptor.preSend(message,channel);
      return message;
    }
  };
}

UPDATE 4

So, here's the latest status.

Producer

Note that I hardcoded the traceparent value for testing purposes. I was concerned that the code I had in place before only calculated the trace id and not the full traceparent value.

@Bean
@GlobalChannelInterceptor(patterns = {"*-out-0"})
public ChannelInterceptor customInterceptor(Tracer tracer) {
  return new ChannelInterceptor() {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
      var payload = message.getPayload();
      var headers = message.getHeaders();
      return MessageBuilder.withPayload(payload)
          .setHeader(
              "traceparent",
              "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") // tracer.currentSpan().context().traceId())
          .copyHeaders(headers)
          .build();
    }
  };
}
spring:
  cloud:
    stream:
      bindings:
        myEvent-out-0:
          content-type: application/*+avro
          destination: my-event
      function:
        autodetect: false
      kinesis:
        binder:
          auto-create-stream: true
          headers:
            - traceparent
          kpl-kcl-enabled: true
  integration:
    management:
      observation-patterns: myEvent-out-0

Consumer

@Bean
public ObservationPropagationChannelInterceptor observationPropagationChannelInterceptor(
    ObservationRegistry observationRegistry) {
  return new ObservationPropagationChannelInterceptor(observationRegistry);
}
spring:
  cloud:
    function:
      definition: myEvent
    stream:
      bindings:
        myEvent-in-0:
          consumer:
            # Note that these values are the defaults
            back-off-initial-interval: 1000
            back-off-max-interval: 10000
            back-off-multiplier: 2.0
            max-attempts: 3
          content-type: application/*+avro
          destination: my-event
          error-handler-definition: errorHandler
          group: my-event-group
        myEvent-out-0:
          content-type: application/*+avro
          destination: my-event-result
      kinesis:
        binder:
          auto-create-stream: true
          headers:
            - traceparent
          kpl-kcl-enabled: true
        bindings:
          myEvent-in-0: 
            consumer:
              checkpoint-mode: record
              listener-mode: record
  integration:
    management:
      observation-patterns: "*"

Log output in consumer

2023-02-01 | 10:59:51.477 | TaskExecutor-250 | DEBUG |com.example.MyEventStreamEventProcessorImpl | Trace: 4f909d6f9bfd860f6d8b6b54cb9245d8 | Span: e560d7749587860b | Received message header key 'traceparent' and value '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01'

As you can see in the log output, the traceparent header is actually sent by the producer and is received by the consumer as a Kinesis header, but it is not used to populate the trace id value that you see in the log.

What else needs to be done in the consumer so that the sent traceparent value is used to populate the trace id value on the consumer? I believe I have implemented everything that has been recommended thus far.

UPDATE 5

@Bean
public Function<Message<MyEvent>, Message<MyEvent>> myEvent(MyEventProcessor myEventProcessor) {
  return myEventProcessor::processMyEvent;
}

UPDATE 6

The traceparent does not get populated in the producer unless I have the following bean specified. Note that I have to construct the traceparent header value myself as I couldn't find how to obtain it from an existing Spring class. Without this bean in place, the traceparent header populated, but it had a different trace id value on the consumer side than what I see in the consumer logs.

  @Bean
  @GlobalChannelInterceptor(patterns = {"*-out-0"})
  public ChannelInterceptor customInterceptor(Tracer tracer) {
    return new ChannelInterceptor() {
      @Override
      public Message<?> preSend(Message<?> message, MessageChannel channel) {
        var payload = message.getPayload();
        var headers = message.getHeaders();
        return MessageBuilder.withPayload(payload)
            .setHeader(
                "traceparent",
                "00-"
                    + tracer.currentSpan().context().traceId()
                    + "-"
                    + tracer.currentSpan().context().spanId()
                    + "-"
                    + (tracer.currentSpan().context().sampled() == null
                            || !tracer.currentSpan().context().sampled()
                        ? "00"
                        : "01"))
            .copyHeaders(headers)
            .build();
      }
    };
  }

UPDATE 7

I replaced the previous bean with the following one, and I now get the stack trace below, so this approach doesn't seem to be working unless I have something missing.

  @Bean
  public NewDestinationBindingCallback newDestinationBindingCallback(ObservationRegistry observationRegistry) {
    return (channelName, channel, producerProperties, extendedProducerProperties) -> {
      ((AbstractMessageChannel)channel).registerObservationRegistry(observationRegistry);
    };
  }
Caused by: java.lang.NullPointerException: null
    at java.base/java.util.Objects.requireNonNull(Objects.java:208)
    at io.micrometer.common.ImmutableKeyValue.<init>(ImmutableKeyValue.java:38)
    at io.micrometer.common.KeyValue.of(KeyValue.java:48)
    at io.micrometer.common.KeyValues.of(KeyValues.java:282)
    at org.springframework.integration.support.management.observation.DefaultMessageSenderObservationConvention.getLowCardinalityKeyValues(DefaultMessageSenderObservationConvention.java:42)
    at org.springframework.integration.support.management.observation.DefaultMessageSenderObservationConvention.getLowCardinalityKeyValues(DefaultMessageSenderObservationConvention.java:29)
    at io.micrometer.observation.SimpleObservation.start(SimpleObservation.java:134)
    at io.micrometer.observation.Observation.observe(Observation.java:557)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithObservation(AbstractMessageChannel.java:338)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:321)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:183)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:144)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:140)

UPDATE 8

OK, the following bean definition works! I no longer have to specify the @GlobalChannelInterceptor on the producer side!

  @Bean
  public NewDestinationBindingCallback newDestinationBindingCallback(
      ObservationRegistry observationRegistry) {
    return (channelName, channel, producerProperties, extendedProducerProperties) -> {
      var abstractMessageChannel = ((AbstractMessageChannel) channel);
      abstractMessageChannel.setBeanName(channelName);
      abstractMessageChannel.registerObservationRegistry(observationRegistry);
    };
  }
Keith Bennett
  • 733
  • 11
  • 25
  • May we have some simple project from you to play on our side? – Artem Bilan Jan 13 '23 at 17:05
  • Unfortunately, the organization I work for strictly prohibits me from sharing any kind of source code outside of general snippets like the one I included above. I would have to do it outside of working hours, so I'll try to get something together. For now, I am looking for some general guidance on what I should expect now that I have converted to the latest Spring libraries and have moved from Sleuth to Micrometer. Should the ```traceId``` be propagating across service boundaries, and if so, what is the minimal configuration required? – Keith Bennett Jan 13 '23 at 17:13

1 Answers1

1

Not sure if that will help you, but what I have learned that Spring Boot 3 uses a W3C propagation by default: https://github.com/micrometer-metrics/tracing/wiki/Spring-Cloud-Sleuth-3.1-Migration-Guide.

Therefore, the header you need to embed into Kinesis Producer record is exactly traceparent.

See more info in Spring Boot docs: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#actuator.micrometer-tracing

UPDATE

Top enable observation on the MessageChannel associated with your Kinesis Producer from binder, you need to add this configuration property:

spring.integration.management.observation-patterns=myEvent-out-0

And then respective tracing header are going to be populated into the message which will be handled by the KinesisMessageHandler. And, therefore, produced in the Kinesis record body.

Here is a sample to demonstrate how to propagate tracing via Kinesis Binder: https://github.com/artembilan/sandbox/tree/master/kinesis-binder-observation-demo

UPDATE 2

To make a tracing propagation via observation on the consumer side in Spring Cloud Stream we have to add this bean:

    @Bean
    ConsumerEndpointCustomizer<MessageProducerSupport> consumerEndpointCustomizer(
            ObservationRegistry observationRegistry) {

        return (endpoint, destinationName, group) -> endpoint.registerObservationRegistry(observationRegistry);
    }

The point is that Spring Cloud Stream does not register endpoints as beans, so we have to instrument them manually. Not sure yet how this can be done from the framework side automatically...

UPDATE 3

To propagate a trace on the consumer side we have to do some fix in the Kinesis Binder. Here is some workaround which demonstrate that trace is supplied between producer and consumer:

@Bean
public Consumer<Message<String>> kinesisConsumer(QueueChannel testBuffer, ObservationRegistry observationRegistry) {
    return message ->
            IntegrationObservation.HANDLER.observation(
                            null,
                            DefaultMessageReceiverObservationConvention.INSTANCE,
                            () -> new MessageReceiverContext(message, "traceHandler"),
                            observationRegistry)
                    .observe(() -> testBuffer.send(message));
}

@Bean
public QueueChannel testBuffer() {
    return new QueueChannel();
}

And I see this in logs:

2023-02-01T15:44:21.766-05:00 DEBUG [,63dacf25bcfd0eca4ec1769d243f5ab3,4ec1769d243f5ab3] 29052 --- [oundedElastic-1] o.s.i.a.outbound.KinesisMessageHandler   : org.springframework.integration.aws.outbound.KinesisMessageHandler@2caeea83 received message: GenericMessage [payload=byte[112], headers={http_requestMethod=GET, http_requestUrl=/test?name=foo, id=a571a72e-d13a-e6cf-5779-a02f863748a5, contentType=application/json, traceparent=00-63dacf25bcfd0eca4ec1769d243f5ab3-4ec1769d243f5ab3-00, timestamp=1675284261766}]
2023-02-01T15:44:24.558-05:00 DEBUG [,63dacf25bcfd0eca4ec1769d243f5ab3,b09fdf7c0e74342a] 29052 --- [esis-consumer-1] o.s.integration.channel.QueueChannel     : preSend on channel 'bean 'testBuffer'; defined in: 'com.example.kinesisbinderobservationdemo.KinesisBinderObservationDemoApplication'; from source: 'public org.springframework.integration.channel.QueueChannel com.example.kinesisbinderobservationdemo.KinesisBinderObservationDemoApplication.testBuffer()'', message: MutableMessage [payload=foo, headers={aws_shard=shardId-000000000000, traceparent=00-63dacf25bcfd0eca4ec1769d243f5ab3-b09fdf7c0e74342a-00, id=4dbe462e-1cd8-f52b-19d2-0f22f0444fcd, sourceData={SequenceNumber: 49637618706029534393522719555344717515294078114444869634,ApproximateArrivalTimestamp: Wed Feb 01 15:44:21 EST 2023,Data: java.nio.HeapByteBuffer[pos=0 lim=112 cap=112],PartitionKey: 1711100227,EncryptionType: NONE}, contentType=application/json, aws_receivedPartitionKey=1711100227, aws_receivedStream=my-event, aws_receivedSequenceNumber=49637618706029534393522719555344717515294078114444869634, timestamp=1675284264554}]

UPDATE 4

To wrap a function call into an Observation, you have to do like this:

@Bean
public Function<Message<MyEvent>, Message<MyEvent>> myEvent(MyEventProcessor myEventProcessor,
        ObservationRegistry observationRegistry) {

    return message ->
            IntegrationObservation.HANDLER.observation(
                            null,
                            DefaultMessageReceiverObservationConvention.INSTANCE,
                            () -> new MessageReceiverContext(message, "tracedFunction"),
                            observationRegistry)
                    .observe(() -> myEventProcessor.processMyEvent(message));
}
Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • I will try that. I'm also having issues with just the REST ```traceId``` propagation. I created simple demo-client and demo-server projects, and the ```traceId``` propagates correctly across those, so it must be some conflict I've introduced somehow. I'm going to begin pulling in my parent pom and bom to try and nail down where the conflict gets introduced. Once I get that figured out, I will add the ```baggage``` header. I appreciate your response, and I'll post here when I found out where the conflict is as it may help future people who read this post. – Keith Bennett Jan 13 '23 at 21:08
  • Thanks for that update. I'm still using ```org.springframework.cloud:spring-cloud-stream-binder-kinesis:2.2.0``` with these new versions of Spring Boot and Spring Cloud. In your opinion, could that be causing issues with the Micrometer Tracing? Should I wait until the new version is released on the 25th as indicated at https://calendar.spring.io before wracking my brain with this issue? My fear is this version of the Kinesis binder is pulling in libraries that may be in conflict with Micrometer Tracing. – Keith Bennett Jan 13 '23 at 22:42
  • No, that's not. Kinesis Binder does not pull anything to conflict with Micrometer Tracing. Also you need to see if you can enable Observation for a Kinesis Binder Producer `MessageChannel`: https://github.com/spring-projects/spring-boot/blob/v3.0.1/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java#L426. Otherwise Spring Integration does not populate tracing headers into a message which will be converted to Kinesis Producer record. – Artem Bilan Jan 14 '23 at 18:40
  • I have discovered that the mere definition of the following bean causes ```traceId```s to not propagate correctly: ```@Bean @ConditionalOnClass(RestTemplateBuilder.class) public RestTemplateBuilder restTemplateBuilder() { return new RestTemplateBuilder(); }``` When I remove this bean, ```traceId``` propagation works. I don't understand why this is happening. – Keith Bennett Jan 18 '23 at 17:38
  • And when I remove the creation of my own ```RestTemplateBuilder``` and try to create the following bean, having Spring auto-configure a ```RestTemplateBuilder``` for me, I get a ```No qualifying bean...``` exception. ```@Bean public RestTemplate restTemplate(RestTemplateBuilder builder) { return builder.build(); }``` – Keith Bennett Jan 18 '23 at 17:41
  • See if you an use a `RestTemplateBuilderConfigurer` like it is done for auto-configured one: `RestTemplateAutoConfiguration`. Not sure what is wrong with your `restTemplate` bean though... If `RestTemplateBuilder` is auto-configured, then everything should be OK. Unless you try to use `restTemplate` bean too early. Doesn't look like this still belongs to this SO thread... – Artem Bilan Jan 18 '23 at 17:56
  • I can certainly open a new SO thread to report on this issue, at least to warn others who may run into the same problem. In the meantime, can you please expand upon your earlier comment that we need to enable Observation for a Kinesis Binder Producer ```MessageChannel```? How do I do that specifically? Can you point me to a code example? – Keith Bennett Jan 18 '23 at 19:27
  • Should this approach for propagating the ```traceId``` work with ```org.springframework.cloud:spring-cloud-stream-binder-kinesis:2.2.0```? I've got these configurations in place, but I still don't see the ```traceId``` propagating across the Kinesis messaging boundary. I do have it propagating across the HTTP REST boundary now though after removing the ```RestTemplateBuilder``` I had configured as a bean and using the auto-configured one from ```RestTemplateAutoConfiguration```. – Keith Bennett Jan 18 '23 at 22:42
  • 1
    If you are on that binder version, then you should stay on Spring Boot 2.7 and Spring Cloud Sleuth. The version compatible with Spring Boot 3.0.x is going to be released next week – Artem Bilan Jan 18 '23 at 22:48
  • OK, so just to confirm, the solution you posted, where we specify ```spring.integration.management.observation-patterns=myEvent-out-0``` and ```spring.cloud.stream.kinesis.binder.headers=traceparent``` will work after the version compatible with Spring Boot 3.0.x is released next week, right? FYI, I have Spring Boot 3.0.1 and Spring Cloud 2022.0.0 working with ```org.springframework.cloud:spring-cloud-stream-binder-kinesis:2.2.0``` except for the ```traceId``` propagation. – Keith Bennett Jan 18 '23 at 22:57
  • I don't understand how that `RestTemplate` is related to the `myEvent-out-0`, but I'll try to test something today with the latest Spring Boot and that latest Kinesis binder. That's why I asked you originally to share with us some project. Now this `RestTemplate` is confusing me and I cannot determine what is your function side for Spring Cloud Stream binding. – Artem Bilan Jan 19 '23 at 14:43
  • The ```RestTemplateBuilder``` bean issue is not related to Kinesis messaging at all. Originally, ```traceId``` propagation wasn't working at all for either the HTTP REST calls or for messaging, which is why I tagged my post with ```micrometer-tracing``` in addition to the stream-related tags. I apologize about the confusion there. I am going to post some code I've put together to recreate the ```RestTemplateBuilder``` issue in a separate SO post. – Keith Bennett Jan 19 '23 at 14:59
  • Why do you still call it `traceId` header? Is this really the name used by HTTP client instrumentation? – Artem Bilan Jan 19 '23 at 15:43
  • It's referenced as ```traceId``` in the log format definition here: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#actuator.micrometer-tracing.getting-started – Keith Bennett Jan 19 '23 at 16:10
  • Please see my code example posted here for your reference if needed: https://stackoverflow.com/questions/75175018/defining-a-resttemplatebuilder-bean-causes-traceid-propagation-across-service-bo – Keith Bennett Jan 19 '23 at 16:12
  • In the log, but that's not header. Neither HTTP, not messaging. It is just whatever is set into logging context by respective tracing handler. – Artem Bilan Jan 19 '23 at 16:14
  • Why cannot you share such a project on GitHub? And sorry that one does not have Kinesis stuff in it. Not relevant to this story. – Artem Bilan Jan 19 '23 at 16:36
  • 1
    So, here is a sample I promised before: https://github.com/artembilan/sandbox/tree/master/kinesis-binder-observation-demo. Turns out it works as I explained. See its README to understand what is need to be done in your configuration. – Artem Bilan Jan 20 '23 at 14:59
  • After upgrading to ```org.springframework.cloud:spring-cloud-stream-binder-kinesis:3.0.0``` and specifying the ```spring.integration.management.observation-patterns=my-out-0``` and ```spring.cloud.stream.kinesis.binder.headers=traceparent``` properties, the tracing id is still not propagated across Kinesis. I am not using WebFlux as your example does. Also, I do not use the ```io.micrometer:micrometer-tracing-bridge-brave``` dependency and instead use ```io.micrometer:micrometer-tracing-bridge-otel```. – Keith Bennett Jan 27 '23 at 21:18
  • I noticed that your example still uses ```org.springframework.cloud:spring-cloud-stream-binder-kinesis:2.2.0```. I will try your project with 3.0.0 and dependencies similar to my configuration. – Keith Bennett Jan 27 '23 at 21:22
  • OK, this morning I have verified again that the ```traceparent``` header is not propagated as a Kinesis header using the configuration your example provides using a non-WebFlux approach to stream processing. Might there be additional processing or configuration needed for non-WebFlux solutions? – Keith Bennett Jan 30 '23 at 15:46
  • WebFlux means nothing there. That was just a simple way to demonstrate the feature. The main point of this tracing was exactly on a Spring Cloud Stream side of configuration. As I said before: give us a simple project to play with. – Artem Bilan Jan 30 '23 at 15:51
  • I have found that when I set ```spring.cloud.stream.kinesis.binder.kpl-kcl-enabled=true``` in your test, it doesn't work, and I'm not sure what is missing to get your test working with kpl/kcl enabled. I say this because we use kpl/kcl. Additionally, I've had to add the ```javax.xml.bind:jaxb-api:2.3.1``` dependency to my ```pom.xml``` file. Otherwise, without this dependency added, I receive a ```java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter``` exception when the microservice starts. Can you please update your example so that it works with kpl/kcl? Thank you. – Keith Bennett Jan 30 '23 at 16:50
  • The KPL doesn't work on Windows, so I cannot test that way, but technically there is no difference from the binder logic perspective. It might be great if you can debug your application and check headers in the `EmbeddedHeaderUtils.embedHeaders()` and see if the mentioned `traceparent` is present there. The consumer side must be a KCL and and this binder: the data format we send is only can be consumer with the same Spring Cloud Stream protocol. – Artem Bilan Jan 30 '23 at 17:15
  • OK. I was able to run the modified sample on Linux: https://github.com/artembilan/sandbox/tree/master/kinesis-binder-observation-demo. Not sure if you can see this build, but it is indeed GREEN: https://build.spring.io/browse/INTSAMPLES-MTP-7. – Artem Bilan Jan 30 '23 at 20:16
  • In logs I see: `org.springframework.integration.aws.outbound.KplMessageHandler@5d1fe212 received message: GenericMessage [payload=byte[112], headers={http_requestMethod=GET, http_requestUrl=/test?name=foo, id=80f6cc88-6da9-eeff-397b-3b0050ec402a, contentType=application/json, traceparent=00-63d825002257c7342502c286b3ab4330-2502c286b3ab4330-00, timestamp=1675109632842}]`. Pay attention to that `traceparent` header we are interested in. – Artem Bilan Jan 30 '23 at 20:17
  • On the consumer side I see this `kinesisReceiveChannel()'', message: GenericMessage [payload=foo, headers={aws_shard=shardId-000000000000, traceparent=00-63d825002257c7342502c286b3ab4330-2502c286b3ab4330-00, id=daad51a3-a201-f6e1-ba59-18aaef58919f, contentType=application/json, aws_receivedPartitionKey=1931035493, aws_receivedStream=my-event, aws_receivedSequenceNumber=49637556395963390083079212253225006273948161346059108354, timestamp=1675109658801}]`. So, I can confirm that tracing is indeed propagated over Kinesis. – Artem Bilan Jan 30 '23 at 20:19
  • I am using ```StreamBridge``` on the producer side. Would that have any impact on this header not being propagated correctly? – Keith Bennett Jan 30 '23 at 20:27
  • As I said: the `kpl-kcl-enabled=true` is OK according to my last testing. The `StreamBridge` might not be in effect. The dynamic channel created by that might not be instrumented. Will test it shortly. – Artem Bilan Jan 30 '23 at 20:30
  • I apologize. I edited my last comment as I hadn't looked into your example when I posted it. I see that you indeed have kpl/kcl enabled. Yes, I am suspecting it might be the fact that I am using ```StreamBridge```. Thanks, Artem. – Keith Bennett Jan 30 '23 at 20:31
  • 1
    Right `SteamBridge` does not expose its internal `DirectWithAttributesChannel` instances as beans. So, we cannot apply observation instrumentation from the framework perspective. You may look into a `NewDestinationBindingCallback` impl and instrument `AbstractMessageChannel` with a `registerObservationRegistry(ObservationRegistry)` – Artem Bilan Jan 30 '23 at 20:45
  • I am not familiar with that approach. Is there any documentation you can point me to that would provide more details and/or examples? Also, could ```StreamBridge``` expose its internal ```DirectWithAttributesChannel``` instances as beans in the future? Is this worthy of opening an issue in GitHub? I am somewhat surprised this isn't working, as with the prior versions of the binder, Spring Boot, and Spring Cloud, I was able to propagate the ```b3``` header using Spring Cloud Sleuth. – Keith Bennett Jan 30 '23 at 20:54
  • 1
    I believe it worked before because there was a special `ChannelInterceptor` applied to those channels as well. I cannot say, though, that it was correct, even if it worked some way for you. Feel free to raise a GH issue since this indeed could be improved this or that way for those dynamic channels as well. – Artem Bilan Jan 30 '23 at 21:59
  • I have opened a GH issue at https://github.com/spring-cloud/spring-cloud-stream/issues/2639. – Keith Bennett Jan 30 '23 at 22:14
  • And why did you "unaccept" my answer? There was nothing about `StreamBridge` in your question and I even provided for you a sample to reflect your original question requirements. See SO recommendations: https://stackoverflow.com/help/how-to-ask – Artem Bilan Jan 31 '23 at 14:54
  • I unaccepted only temporarily because I didn't want anyone else to run across this and assume that just by adding those configurations it will simply just work. Using the ```StreamBridge```, trace ids are not propagated automatically yet. I am testing out a ```GlobalChannelInterceptor``` solution right now and planned on adding more context to the answer before selecting it as answered again. Sorry, nothing personal at all. I just want to provide better clarity to others. – Keith Bennett Jan 31 '23 at 17:51
  • After adding the ```GlobalChannelInterceptor```, I am logging the received headers in the consumer, and now I see the following logged header on the consumer side: ```Received message header key 'traceparent' and value '31512fee6b86099affb46a7412e9e5ec'```. That value matches what I see logged in the producer microservice's logs. However, a new trace id is generated on the consumer in its logs, so it appears that simply adding this header via the interceptor didn't do the trick. Is there anything additional that needs to be done other than adding this header in the interceptor? – Keith Bennett Jan 31 '23 at 18:08
  • I added the ```GlobalChannelInterceptor``` bean I added above under Update 2. – Keith Bennett Jan 31 '23 at 18:18
  • See this one also: https://docs.spring.io/spring-integration/reference/html/system-management.html#observation-propagation. You probably will need to add that `ObservationPropagationChannelInterceptor` to be able to continue a trace on the consumer side. And yes, the consumer side has to be instrumented with a `spring.integration.management.observation-patterns=*` – Artem Bilan Jan 31 '23 at 18:54
  • It's not clear to me how to add the ```ObservationPropagationChannelInterceptor``` on the consumer side. Can you please provide me example code of how I can do this using a ```GlobalChannelInterceptor```? Hopefully I'm getting close on this. I appreciate it. – Keith Bennett Jan 31 '23 at 20:15
  • I have added a potential bean implementation in UPDATE 3 above. Is that what is intended to propagate this trace on the consumer side? – Keith Bennett Jan 31 '23 at 20:24
  • I don't think it is going to work on the producer side since we don't have beans for those channels in `StreamBridge`. But consumer side is OK. You probably don't need that wrapper, but just a bean for the `ObservationPropagationChannelInterceptor`. And that is in addition to the `spring.integration.management.observation-patterns=*` and `spring.cloud.stream.kinesis.binder.headers=traceparent` on the consumer side. – Artem Bilan Jan 31 '23 at 20:29
  • I have implemented these recommendations as summarized in UPDATE 4 above, but the trace id on the consumer is still a different value. What else is missing in this solution? – Keith Bennett Feb 01 '23 at 16:26
  • 1
    See an UPDATE 2 in my answer. – Artem Bilan Feb 01 '23 at 17:01
  • I added the ```ConsumerEndpointCustomizer``` bean to my consumer and still the trace id logged is different than what is received via the ```traceparent``` header. I also still have the ```ObservationPropagationChannelInterceptor``` defined on the consumer. – Keith Bennett Feb 01 '23 at 17:35
  • OK. I'll test that then in my sample... – Artem Bilan Feb 01 '23 at 17:38
  • 1
    OK. My bad. The Kinesis Binder will need to be fixed. Even if we got an observation context via that customizer, we cannot extract current trace: it is done a bit later by respective `EmbeddedHeadersChannelInterceptor`. And channel is already a `PRODUCER` kind: it does not care what we have in the headers, but it set new value. So, "unembedding" headers must happen exactly in the Kinesis Consumer endpoint where we set a trace as a `CONSUMER` and this one is already about extracting context from headers. Would you mind to raise a respective GH issue for Kinesis Binder? – Artem Bilan Feb 01 '23 at 20:37
  • I'm not sure what is your consumer impl, but that is where at the moment we may do a workaround with starting a new `Observation` via `IntegrationObservation.HANDLER`. See an UPDATE 3 in my answer. – Artem Bilan Feb 01 '23 at 20:45
  • New issue opened and available at https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/issues/185. I will take a look at your UPDATE 3 next. Thank you, Artem. – Keith Bennett Feb 01 '23 at 20:56
  • I have a ```Function``` bean defined in my consumer that consumes a message and produces another result message. I have posted it for your reference under UPDATE 5 above. I am having difficulty understanding what I am supposed to implement after taking a look at your solution posted in UPDATE 3. – Keith Bennett Feb 01 '23 at 22:17
  • 1
    See an UPDATE 4 in my answer. – Artem Bilan Feb 02 '23 at 15:57
  • After incorporating your answer in UPDATE 4, while I now have the traceparent header propagating over Kinesis and populating the trace id in my consumer logs, what is odd that I just noticed is that the trace id now does not match what I see logged in the producer. – Keith Bennett Feb 02 '23 at 19:34
  • To add more context, I removed the ```@GlobalChannelInterceptor``` that I had previously defined. Do I still need it in place on the producer to correctly populate the ```traceparent``` header? If so, how do I properly construct that value? My understanding is that a ```traceparent``` value is a value formatted as ```version-trace_id-parent_id-trace_flags```, and I'm not sure where this value can be obtained when I look at the ```TraceContext```. It looks like I would have to construct this value myself. – Keith Bennett Feb 02 '23 at 19:44
  • Hold on. The `spring.integration.management.observation-patterns=*-out-0` doesn't work because it is `StreamBridge`, but you say that `ChannelInterceptor` works... – Artem Bilan Feb 02 '23 at 20:38
  • I see. There is `StreamBridge.addInterceptors()` feature. Why then `NewDestinationBindingCallback` bean with the logic like `registerObservationRegistry(ObservationRegistry)` on a `MessageChannel` doesn't work for you? – Artem Bilan Feb 02 '23 at 20:43
  • Please see my UPDATE 6 above. With this bean now in place and your code you provided in UPDATE 4, the trace id logged in the producer and consumer now match. Of course, this is a lot of boilerplate code that I assume will be moved into the binder. As far as your last comment, how could I implement that callback bean? I could definitely try that next. – Keith Bennett Feb 02 '23 at 20:48
  • UPDATE 7 contains the result of adding that ```NewDestinationBindingCallback``` bean. It didn't work for me unless I'm missing something. When I added it, I removed the ```@GlobalChannelInterceptor```. – Keith Bennett Feb 02 '23 at 21:09
  • Your UPDATE 7 is correct. And that stack trace confirms that an observation is applied on an internal channel used by `StreamBridge`. The remaining problem is with that `context.getProducerName()` which is `null` for this internal channel instance. Consider to `setBeanName()` on that channel as well manually, alongside with an `ObservationRegistray` in your `NewDestinationBindingCallback` impl. And yes: most of this will go to the framework code soon enough. Thank you for your effort and great feedback! – Artem Bilan Feb 02 '23 at 21:38
  • 1
    OK, see my UPDATE 8 above! Your suggestion to name the bean worked! Nice! – Keith Bennett Feb 02 '23 at 22:41