@EnableIntegration
@Configuration
@ConfigurationProperties(prefix = "mqtt")
@IntegrationComponentScan(basePackages = "org.sample.mqtt")
public class MqttConfig {
private String[] serverUris;
private String username;
private char[] password;
private int keepAliveInterval;
private String[] subTopics;
private Class<? extends BytesMessageMapper> messageMapper;
private String clientIdPrefix;
private String modelPackages;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(serverUris);
options.setUserName(username);
options.setPassword(password);
options.setKeepAliveInterval(keepAliveInterval);
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MqttMessageConverter bytesMessageConverter() throws NoSuchMethodException {
BytesMessageMapper bytesMessageMapper = BeanUtils.instantiateClass(messageMapper.getConstructor(String.class), modelPackages);
return new BytesMessageConverter(bytesMessageMapper);
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttMessageConverter mqttMessageConverter) {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdPrefix + "_outbound", mqttClientFactory());
messageHandler.setConverter(mqttMessageConverter);
messageHandler.setCompletionTimeout(5000);
messageHandler.setAsync(true);
return messageHandler;
}
@Bean
public MessageProducer mqttInbound(MqttMessageConverter mqttMessageConverter) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientIdPrefix + "_inbound", mqttClientFactory(), subTopics);
adapter.setConverter(mqttMessageConverter);
adapter.setOutputChannel(mqttInboundChannel());
adapter.setCompletionTimeout(5000);
adapter.setQos(1);
return adapter;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
MqttClient client = new MqttClient(serverUrl, clientId, new MemoryPersistence());
According to clientId, multiple MQTTClients can be created and maintained in HashMap. How can we achieve the same effect if we use Spring-integration-MQTT
How do I create multiple MQTT clients through HTTP requests and maintain them in a HashMap
IntegrationFlowContext Dynamic registration FlowContext.Registration (flow).register();
But still do not know how to handle and use. How can each MQTT client be maintained and managed so that it can be selected to produce data based on the key of the hashMap