1
@EnableIntegration
@Configuration
@ConfigurationProperties(prefix = "mqtt")
@IntegrationComponentScan(basePackages = "org.sample.mqtt")
public class MqttConfig {

    private String[] serverUris;
    private String username;
    private char[] password;
    private int keepAliveInterval;
    private String[] subTopics;
    private Class<? extends BytesMessageMapper> messageMapper;
    private String clientIdPrefix;
    private String modelPackages;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(serverUris);
        options.setUserName(username);
        options.setPassword(password);
        options.setKeepAliveInterval(keepAliveInterval);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MqttMessageConverter bytesMessageConverter() throws NoSuchMethodException {
        BytesMessageMapper bytesMessageMapper = BeanUtils.instantiateClass(messageMapper.getConstructor(String.class), modelPackages);
        return new BytesMessageConverter(bytesMessageMapper);
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttMessageConverter mqttMessageConverter) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdPrefix + "_outbound", mqttClientFactory());
        messageHandler.setConverter(mqttMessageConverter);
        messageHandler.setCompletionTimeout(5000);
        messageHandler.setAsync(true);
        return messageHandler;
    }

    @Bean
    public MessageProducer mqttInbound(MqttMessageConverter mqttMessageConverter) {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientIdPrefix + "_inbound", mqttClientFactory(), subTopics);
        adapter.setConverter(mqttMessageConverter);
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);
        return adapter;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }


MqttClient client = new MqttClient(serverUrl, clientId, new MemoryPersistence());

According to clientId, multiple MQTTClients can be created and maintained in HashMap. How can we achieve the same effect if we use Spring-integration-MQTT

How do I create multiple MQTT clients through HTTP requests and maintain them in a HashMap

IntegrationFlowContext Dynamic registration FlowContext.Registration (flow).register();

But still do not know how to handle and use. How can each MQTT client be maintained and managed so that it can be selected to produce data based on the key of the hashMap

mkt
  • 11
  • 2
  • Please do not post screenshots of code, they are hard to read, impossible for those that use screen readers and can not be searched. Please [edit](https://stackoverflow.com/posts/73455283/edit) the question to post the actual text and use the toolbar to format it. – hardillb Aug 23 '22 at 08:20

0 Answers0