5

I am trying to use spring integration for send mqtt messages to a broker and I am trying to use the gateway interface.

 @Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    //set the factory details
    return factory:
}

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler =
            new MqttPahoMessageHandler("randomString", mqttClientFactory());
    //set handler details
    messageHandler.setDefaultTopic(topic);
    return messageHandler;
}

@Bean
public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
private interface MyGateway {
    void sendToMqtt(String data);
}

My question is: If I want to use the gateway handler to send messages to different topics how would I do that without having to create an adapter for each topic ?

Thanks.

Hope I formulated my question clearly and the code is properly formatted.

1 Answers1

6

You need to set the target topic in a message header.

Here is one way to do that...

void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

The gateway proxy will assemble the message with the header, which is then used by the outbound adapter.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • This does not provide an answer to the question. To critique or request clarification from an author, leave a comment below their post. - [From Review](/review/low-quality-posts/22249405) – georgeawg Feb 19 '19 at 19:47
  • It actually does answer the question, but I'll add some more words. – Gary Russell Feb 19 '19 at 21:01
  • 1
    That is exactly what I was looking for. I found that @header(topic) annotation but I was not sure how to use it. Do you know where I could find the documentation for this in spring as well ? I've been searching for it for a couple of days. – Nume Prenume Feb 20 '19 at 08:32
  • The use of `@Header` is [documented here](https://docs.spring.io/spring-integration/reference/html/#gateway). The MQTT outbound channel adapter is [documented here](https://docs.spring.io/spring-integration/reference/html/#mqtt-outbound). `>defaultTopic - The default topic to which the message is sent (used if no mqtt_topic header is found).` and in the javadocs for `setDefaultTopic()`. – Gary Russell Feb 20 '19 at 14:18
  • @GaryRussell, please, also check https://stackoverflow.com/questions/60456751/kafka-producer-how-to-change-a-topic-without-down-time-and-preserving-message – Yan Khonski Mar 02 '20 at 11:33