0

Develop mqtt connector for Kafka using spring.

Using the mqtt library provided by spring, messages are collected as follows.

message handler

@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
                
                if(topic.equals("myTopic")) {
                    System.out.println("Mqtt data pub");    
                }
                System.out.println(message.getPayload());

                if(topic==null) {
                    topic = "mqttdata";
                }
                String tag = "test/vib";
                String name = null;
                if(name==null) {
                    name = KafkaMessageService.MQTT_PRODUCER;
                }
                HashMap<String, Object> datalist = new HashMap<String, Object>();
                        try {
                    datalist =convertJSONstringToMap(message.getPayload().toString());
                    System.out.println(datalist.get("mac"));
                    counts = kafkaMessageService.publish(topic, name, tag, (HashMap<String,Object>[] datalist);
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

    public static HashMap<String,Object> convertJSONstringToMap(String json) throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        HashMap<String, Object> map = new HashMap<String, Object>();
        
        map = mapper.readValue(json, new TypeReference<HashMap<String, Object>>() {});
        
        return map;
}

publish method

    public int publish(String topic,String producerName,String tag,HashMap<String,Object>[] datalist) throws NotMatchedProducerException,KafkaPubFailureException{
        KafkaProducerAdaptor adaptor = searchProducerAdaptor(producerName);
        if(adaptor==null) {
            throw new NotMatchedProducerException();
        }
        
        KafkaTemplate<String,Object> kafkaTemplate = adaptor.getKafkaTemplate();
        
        LocalDateTime currentDateTime = LocalDateTime.now();
        String receivedTime = currentDateTime.toString();
        
        ObjectMapper objectMapper = new ObjectMapper();
        
        String key = adaptor.getName();
        
        int counts = 0;
        for(HashMap<String,Object> data : datalist) {
            Map<String,Object> messagePacket = new HashMap<String,Object>();
            messagePacket.put("tag", tag);
            messagePacket.put("data", data);
            messagePacket.put("receivedtime", receivedTime);
            
            try {
                kafkaTemplate.send(topic,key,objectMapper.valueToTree(messagePacket)).get();
                logger.info("Sent message : topic=["+topic+"],key=["+key+"] value=["+messagePacket+"]");
            } catch(Exception e) {
                logger.info("Unable to send message : topic=["+topic+"],key=["+key+"] message=["+messagePacket+"] / due to : "+e.getMessage());
                throw new KafkaPubFailureException(e);
            }
            counts++;
        }
        
        return counts;
    }

I don't know how to declare a hashmap <String, object> [] as an instance and how to use it.

The above source was taken from spring support as it is, and some modifications were made.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Harzarics
  • 71
  • 1
  • 5
  • 1
    Why do you think you need a HashMap? You can send `message.getPayload().toString()` directly to a `KafkaProducer, String>`. Other than that, your IDE tells you exactly what `getPayload()` returns... Also, `spring-kafka` accepts `Message` objects directly – OneCricketeer Jan 13 '22 at 01:57
  • I've edited the post due to lack of information. umm.. thank you but my publish method needs HashMap[]... i addded source code. So, how to declare and use a hash array object is necessary. – Harzarics Jan 13 '22 at 04:11
  • Your publish method is your own definition. It can take anything. All I'm saying is that your kafkaTemplate should strings, not a hashmap, and definitely not Object. And without parsing anything, `message.getPayload().toString()` is already a string that can directly be forwarded to Kafka... You don't need an array of hashmaps either; I suggest you create your own class type to parse the MQTT event into, then make an array of those. Take q minute to read the spring-kafka documentation and it shows how you can send a list of Message objects – OneCricketeer Jan 13 '22 at 15:46
  • Thanks. However, since the publish method is used in several class, it is difficult to modify it. Let's read the spring-kafka documentation. – Harzarics Jan 14 '22 at 07:58
  • Does this answer your question? ["Cannot create generic array of .." - how to create an Array of Map?](https://stackoverflow.com/questions/14917375/cannot-create-generic-array-of-how-to-create-an-array-of-mapstring-obje) – Brian Tompsett - 汤莱恩 Jan 14 '22 at 10:51

0 Answers0