I am working on a POC for implementing a kafka cluster in my project. I have setup a kafka cluster in my local machine with 3 brokers. Now I am sending messages to the Kafka server using Spring MVC REST service which is internally using Spring Kafka to produce and consume messages to and from the Kafka cluster. Now I need to send files (containing XML data) to the Kafka server. I tried using Kafka's ByteArraySerializer/Deserializer to do this. I am converting the XMl file to a byteArray and sending it to the Kafka server but my byteArray is getting converted to an array of numbers. I am pasting the code that I am using to send the message to the kafka server.
@Async
public String sendMessageFromFile(String filePath) {
File file = new File(filePath);
byte[] b = new byte[(int) file.length()];
try {
FileInputStream fileInputStream = new FileInputStream(file);
fileInputStream.read(b);
for (int i = 0; i < b.length; i++) {
System.out.print((char)b[i]);
}
ListenableFuture<SendResult<Integer, byte[]>> future = kafkaTemplate.send("TESTQUEUE", b);//Sending bytearray to the kafka cluster
future.addCallback(new ListenableFutureCallback<SendResult<Integer, byte[]>>() {
@Override
public void onSuccess(final SendResult<Integer, byte[]> message) {
LOGGER.info("sent message= " + Arrays.toString(message.getProducerRecord().value()) + " with offset= " + message.getRecordMetadata().offset());
}
@Override
public void onFailure(final Throwable throwable) {
LOGGER.error("unable to send message from file with path= " + filePath, throwable);
}
});
fileInputStream.close();
}catch(FileNotFoundException e) {
LOGGER.error("Invalid file path");
return "Invalid file path";
}catch(IOException e) {
LOGGER.error("An error occured while reading the file: "+e.getMessage());
return "An error occured while parsing the file";
}
return "File is being sent";
}
Below is my configuration.
public class MessagingServiceConfiguration {
private final String bootstrapServersProducer = "localhost:9092";
private final String bootstrapServersConsumer = "localhost:9093";
@Autowired
private Environment env;
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, byte[]>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, byte[]> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConsumer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return props;
}
@Bean
public ListenerServiceImpl listener() {
return new ListenerServiceImpl();
}
@Bean
public ProducerFactory<Integer, byte[]> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersProducer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return props;
}
@Bean
public KafkaTemplate<Integer, byte[]> kafkaTemplate() {
return new KafkaTemplate<Integer, byte[]>(producerFactory());
}
}
Below is my consumer code:
@KafkaListener(id = "TEST_CONSUMER_ID", topics = "TESTQUEUE")
public void listenMessageInQueue(byte[] msg) {
LOGGER.info("receiving payload='{}'", msg);
messageDao.saveMessage(msg.toString());
}
Below is the message i am getting back from kafka.
[60, 65, 100, 118, 97, 110, 99, 101, 83, 104, 105, 112, 109, 101, 110, 116, 78, 111, 116, 105, 102, 105, 99, 97, 116, 105, 111, 110, 32, 120, 109, 108, 110, 115, 58, 100, 115, 102, 114, 61, 34, 117, 114, 110, 58, 114, 111, 115, 101, 116, 116, 97, 110, 101, 116, 58, 115, 112, 101, 99, 105, 102, 105, 99, 97, 116, 105, 111, 110, 58, 100, 111, 109, 97, 105, 110, 58, 80, 114, 111, 99, 117, 114, 101,......
I am unable to find out what i am missing in this. Could some one point me in the right direction for accomplishing this task.