0

This is sort of sequel to this question. Can I use "plain" Apache Kafka Binder together with functional model? So far using annotation based configuration I mixed both, spring-cloud-stream-binder-kafka for simple consuming / producing and spring-cloud-stream-binder-kafka-streams for advanced stream processing in one application.

Functional model seems to be supported only by streams binder, and if I try to mix both approaches - annotation based for simple usage and functional for streams, stream binding is not registered.

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            simple-binding-in:
              destination: another-topic

public interface SimpleBinding {

    String INPUT = "simple-binding-in";

    @Input(INPUT)
    SubscribableChannel simpleIn();

}

@Component
public class SimpleListener {

    @StreamListener(SimpleBinding.INPUT)
    public void listen(@Payload SomeDto payload) {
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}

@EnableBinding(SimpleBinding.class) is present on configuration class. Is it preferred / supported to mix both as described or should I use streams-binder even for simple message consumption?

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
B.Gen.Jack.O.Neill
  • 8,169
  • 12
  • 51
  • 79

1 Answers1

0

For Kafka Binder you can and absolutely should use functional model and forget about StreamListener all together. This way it's going to be aligned with your KStream functional model.

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            listen-in-0:
              destination: another-topic

@Component
public class SimpleListener {

    @Bean
    public Consumer<SomeDto> listen() {
        return payload -> ...
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}
Oleg Zhurakousky
  • 5,820
  • 16
  • 17
  • This seems to work, but I have found another problem when used with prometheus registry "Prometheus requires that all meters with the same name have the same set of tag keys". Meter 'kafka_metrics_count_count' is registered both with and without tag clientId. Why in this case you don´t have to specify listen in function.definition? – B.Gen.Jack.O.Neill Oct 16 '19 at 11:42
  • `Why in this case you don´t have to specify listen in function.definition?` - not sure what you're asking – Oleg Zhurakousky Oct 16 '19 at 12:12
  • Ohh i see, actually you do need `definition` property. In your case it should be function names delimited by `'`. `....definition: processStream;listen` – Oleg Zhurakousky Oct 16 '19 at 12:16
  • I thought this configuration works because my test passed for processStream(), but I still cannot get SimpleListener.listen() to bind properly. Stream works as expected but no consumer is created for listen. – B.Gen.Jack.O.Neill Oct 16 '19 at 12:27
  • Wait, change SimpleListener class to `@Configuration` – Oleg Zhurakousky Oct 16 '19 at 12:57
  • It makes no difference, tried both. Bean is created but without Kafka binding (consumer). – B.Gen.Jack.O.Neill Oct 16 '19 at 13:01
  • Can you create a sample project that reproduces it and push it to github? I can't seem to reproduce it on my end – Oleg Zhurakousky Oct 16 '19 at 13:05
  • Sure, https://github.com/vitjouda/examples/tree/master/functional-kafka-binder, KStream is commented out so it is clear no consumer is created for this function. – B.Gen.Jack.O.Neill Oct 16 '19 at 14:36
  • @B.Gen.Jack.O.Neill This turned out to be an issue we needed to fix. Now it is fixed and the latest snapshot (3.0) will have it. – sobychacko Oct 18 '19 at 20:13
  • hey @OlegZhurakousky how are we supposed to test this? I can't find examples to test Spring Cloud Functions with Kafka binder – nmarques May 08 '20 at 11:10
  • 1
    @nmarques https://github.com/spring-cloud/spring-cloud-stream-binder-kafka project is full of tests that use embedded kafka – Oleg Zhurakousky May 11 '20 at 19:23