0

I am working on a POC for implementing a kafka cluster in my project. I have setup a kafka cluster in my local machine with 3 brokers. Now I am sending messages to the Kafka server using Spring MVC REST service which is internally using Spring Kafka to produce and consume messages to and from the Kafka cluster. Now I need to send files (containing XML data) to the Kafka server. I tried using Kafka's ByteArraySerializer/Deserializer to do this. I am converting the XMl file to a byteArray and sending it to the Kafka server but my byteArray is getting converted to an array of numbers. I am pasting the code that I am using to send the message to the kafka server.

@Async
    public String sendMessageFromFile(String filePath) {
        File file = new File(filePath);
        byte[] b = new byte[(int) file.length()];
        try {
              FileInputStream fileInputStream = new FileInputStream(file);
              fileInputStream.read(b);
              for (int i = 0; i < b.length; i++) {
                  System.out.print((char)b[i]);
              }
              ListenableFuture<SendResult<Integer, byte[]>> future = kafkaTemplate.send("TESTQUEUE", b);//Sending bytearray to the kafka cluster
              future.addCallback(new ListenableFutureCallback<SendResult<Integer, byte[]>>() {

                @Override
                public void onSuccess(final SendResult<Integer, byte[]> message) {

                    LOGGER.info("sent message= " + Arrays.toString(message.getProducerRecord().value()) + " with offset= " + message.getRecordMetadata().offset());
                }

                @Override
                public void onFailure(final Throwable throwable) {
                    LOGGER.error("unable to send message from file with path= " + filePath, throwable);
                }
            });
              fileInputStream.close();
        }catch(FileNotFoundException e) {
            LOGGER.error("Invalid file path");
            return "Invalid file path";
        }catch(IOException e) {
            LOGGER.error("An error occured while reading the file: "+e.getMessage());
            return "An error occured while parsing the file";
        }
        return "File is being sent";
    }

Below is my configuration.

public class MessagingServiceConfiguration {
    private final String bootstrapServersProducer = "localhost:9092";
    private final String bootstrapServersConsumer = "localhost:9093";
    @Autowired
       private Environment env;
     @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, byte[]>
                            kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, byte[]> factory =
                                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }

        @Bean
        public ConsumerFactory<Integer, byte[]> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }

        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConsumer);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

            return props;
        }

        @Bean
        public ListenerServiceImpl listener() {
            return new ListenerServiceImpl();
        }

        @Bean
        public ProducerFactory<Integer, byte[]> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }

        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersProducer);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
            return props;
        }       

        @Bean
        public KafkaTemplate<Integer, byte[]> kafkaTemplate() {
            return new KafkaTemplate<Integer, byte[]>(producerFactory());
        }
}

Below is my consumer code:

@KafkaListener(id = "TEST_CONSUMER_ID", topics = "TESTQUEUE")
    public void listenMessageInQueue(byte[] msg) {
        LOGGER.info("receiving payload='{}'", msg);
        messageDao.saveMessage(msg.toString());     
    }

Below is the message i am getting back from kafka.

[60, 65, 100, 118, 97, 110, 99, 101, 83, 104, 105, 112, 109, 101, 110, 116, 78, 111, 116, 105, 102, 105, 99, 97, 116, 105, 111, 110, 32, 120, 109, 108, 110, 115, 58, 100, 115, 102, 114, 61, 34, 117, 114, 110, 58, 114, 111, 115, 101, 116, 116, 97, 110, 101, 116, 58, 115, 112, 101, 99, 105, 102, 105, 99, 97, 116, 105, 111, 110, 58, 100, 111, 109, 97, 105, 110, 58, 80, 114, 111, 99, 117, 114, 101,......

I am unable to find out what i am missing in this. Could some one point me in the right direction for accomplishing this task.

Venu
  • 239
  • 4
  • 18
  • 2
    If you have xml content, why don't you use `StringSerializer`? – daniu Dec 20 '18 at 14:28
  • @daniu Do you mean to say that i should convert my bytearray to string and send it? – Venu Dec 20 '18 at 14:33
  • 2
    `is getting converted to an array of numbers` Where? At the consumer? Show your consumer (`@KafkaListener`); sending and receiving `byte[]` should work ok with the BAS and BAD. – Gary Russell Dec 20 '18 at 14:39
  • Yes i tried to print it at the consumer end as well in the "onSuccess" callback method in the producer end. I have added the consumer code as well – Venu Dec 20 '18 at 14:44
  • 1
    These "numbers" are hex values of the bytes you've sent – OneCricketeer Dec 20 '18 at 14:59
  • 1
    Note that internally, Kafka StringDeserializer is doing the exact same thing as the duplicate post, so you should probably just use that instead – OneCricketeer Dec 20 '18 at 15:01
  • @cricket_007 You are right. That was my problem. After adding the conversion/StringDeserializer it is working fine now. Thank you so much for helping me out. – Venu Dec 20 '18 at 16:00
  • One other thing that I might recommend to look at is using Kafka Connect for transforming the XML into a binary Structured object - https://github.com/jcustenborder/kafka-connect-transform-xml Otherwise, each of the `` elements cause overhead, and Kafka by default has a max message size of 1MB – OneCricketeer Dec 20 '18 at 16:44

0 Answers0