3

I am using Spring's spring-integration-mqtt and i can connect to a single Mqtt server and can receive messages on subscribed topics , and now i want to make application which can connect to multiple Mqtt Servers and can receive data from every connection and i want to manage it as dynamic where i can add more Mqtt servers from database or text file.

a simple bean for single Mqtt connection for subscription is as follow

@Bean
public MessageProducer inbound() {

    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);

    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;

}

above code creates a connection for the mqtt server and can receive messages and if i copy paste the same code twice for second server with different Mqtt ip address i can connect to both Mqtt Server as follows

@Bean
public MessageProducer inbound() {

    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);

    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;

}

@Bean
public MessageProducer inbound2() {

    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);

    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;

}

above code also works fine and i can receive message from both Mqtt Servers, but is there any way i can manage it dynamically like as follows, i change the bean's return type to list, but didn't worked:

  @Bean
  public List<MqttPahoMessageDrivenChannelAdapter> getAdapter () {
      List<MqttPahoMessageDrivenChannelAdapter > logConfList=new ArrayList<MqttPahoMessageDrivenChannelAdapter>();
      MqttPahoMessageDrivenChannelAdapter adapter2 =
              new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                               "DATA/#", "LD/#","CONF/#","CONFIG/#");
      adapter2.setCompletionTimeout(0);
      adapter2.setConverter(new DefaultPahoMessageConverter());
      adapter2.setQos(2);

      adapter2.setOutputChannel(mqttInputChannel() );

      MqttPahoMessageDrivenChannelAdapter adapter =
              new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
                                               "DATA/#", "LD/#","CONF/#","CONFIG/#");
      adapter.setCompletionTimeout(0);
      adapter.setConverter(new DefaultPahoMessageConverter());
      adapter.setQos(2);

      adapter.setOutputChannel(mqttInputChannel() );
      logConfList.add(adapter);
      logConfList.add(adapter2);

      return logConfList;

  }

is there any way i can manage these beans dynamically, where i can fetch mqtt server details from text file and in a for loop or something i can manage multiple connections.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
Rawat
  • 461
  • 3
  • 6
  • 23

1 Answers1

3

See Dynamic and runtime Integration Flows.

@Autowired
private IntegrationFlowContext flowContext;

private IntegrationFlowRegistration addAnAdapter(String uri, String clientId, MessageChannel channel,
        String... topics) {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(uri, clientId, topics);
    // more adapter configuration
    IntegrationFlow flow = IntegrationFlows.from(adapter)
        .channel(channel)
        .get();
    return this.flowContext.registration(flow).register();
}

private void removeAdapter(IntegrationFlowRegistration flowReg) {
    this.flowContext.remove(flowReg.getId());
}
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • The link above is returing a 404 error. Here's an [updated link](https://spring.io/blog/2016/07/08/java-dsl-for-spring-integration-1-2-m1-and-1-1-3-are-available) – James Jan 28 '20 at 13:51
  • Thanks; your link is to the blog; the broken link was to the documentation; fixed. – Gary Russell Jan 28 '20 at 14:04