9

I have a spring-cloud-stream application with kafka binding. I would like to send and receive a message from the same topic from within the same executable(jar). I have my channel definitions such as below:- public interface ChannelDefinition { @Input("forum") public SubscriableChannel readMessage(); @Output("forum") public MessageChannel postMessage(); }

I use @StreamListener to receive messages. I get all sorts of unexpected errors. At times, i receive

  1. No dispatcher found for unknown.message.channel for every other message
  2. If i attach a command line kafka subscriber to the above forum topic, it recieves every other message.
  3. My application receives every other message, which is exclusive set of messages from command line subscriber. I have made sure that my application subscribes under a specific group name.

Is there a working example of the above usecase?

Jimm
  • 8,165
  • 16
  • 69
  • 118

3 Answers3

14

This is a wrong way to define bindable channels (because of the use of the forum name for both). We should be more thorough and fail fast on it, but you're binding both the input and the output to the same channel and creating a competing consumer within your application. That also explains your other issue with alternate messages.

What you should do is:

public interface ChannelDefinition { 

   @Input
   public MessageChannel readMessage();

   @Output
   public MessageChannel postMessage();
}

And then use application properties to bind your channels to the same queue:

spring.cloud.stream.bindings.readMessage.destination=forum
spring.cloud.stream.bindings.postMessage.destination=forum
Marius Bogoevici
  • 2,380
  • 1
  • 13
  • 14
  • I added an answer below with code on how to subscribe to the input based on this answer: https://stackoverflow.com/questions/43128803/can-i-bind-to-multiple-consumer-groups-with-spring-cloud-stream. – Tony Zampogna Aug 17 '18 at 21:33
1

Along with the answer above by Marius Bogoevici, here's an example of how to listen to that Input.

@StreamListener
public void handleNewOrder(@Input("input") SubscribableChannel input) {
    logger.info("Subscribing...");
    input.subscribe((message) -> {
        logger.info("Received new message: {}", message);
    });
}
Tony Zampogna
  • 1,928
  • 1
  • 12
  • 14
0

For me, consuming from "input" didn't work. I needed to use method name on @Streamlistener and needed to use @EnableBinding, like below:

@Slf4j
@RequiredArgsConstructor
@EnableBinding(value = Channels.class)
public class Consumer {
    
    @StreamListener("readMessage")
    public void retrieve(Something req) {
        log.info("Received {{}}", req);
    }

}
Sercan Ozdemir
  • 4,641
  • 3
  • 34
  • 64