1

I'm using Spring Cloud Stream 3.x in Spring Boot 2.x application to consume messages from a Kafka topic.

I want to have a listener that consumes messages conditionally on some custom header value, as per doc:

@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
  LOGGER.info("Received: {}", message);
}

However listener never gets notified, and if condition is removed I see the following in the log:

Received: ... SomeHeader: [B@1055e4af ...

It turns out that custom headers are left in Kafka byte array raw format, making them not eligible for condition evaluation.

Is some additional configuration needed or am I missing something?

Oleg Efimov
  • 632
  • 1
  • 7
  • 17

1 Answers1

2

After some digging in sources and stackoveflow I have found the following:

So I added my custom header mapper bean (bean name is important, it allows to omit additional configuration property), which maps my custom header to String:

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper();
    headerMapper.setRawMappedHeaders(Map.of(
        "SomeHeader", true
    ));
    return headerMapper;
}

That fixed the problem:

Received: ... SomeHeader: SomeHeaderValue ...

P.S. It seems like a bug in Spring Cloud Stream:

  1. It introduces its own implementation of header mapper (BinderHeaderMapper), but the latter doesn't respect conditional routing feature.
  2. Header mapper is subclassed in KafkaMessageChannelBinder, this added behaviour is non-obvious and will be lost if custom header mapper is provided.
Oleg Efimov
  • 632
  • 1
  • 7
  • 17
  • You should "accept" your own answer. You could also use `condition = "new String(headers['SomeHeader'])=='SomeHeaderValue'"` – Gary Russell Dec 17 '20 at 14:48
  • Yep, I can accept it in two days :) As for `new String` in condition -- usable and shortest solution, but IMHO a bit ugly. – Oleg Efimov Dec 17 '20 at 18:10
  • BTW @GaryRussell what do you think of the points in p.s.? – Oleg Efimov Dec 17 '20 at 18:19
  • I don't remember why we had to reintroduce that mapper but `setRawMappedHeaders` is on the `AbstractKafkaHeaderMapper` so it's available on the `BinderHeaderMapper` too. – Gary Russell Dec 17 '20 at 18:42