Received messages are showing payload as null.
Publishing and consuming is working. I can see the logs printing getting triggered whenever there is a publish and consumption. Messages are produced every second.
I think the issue might be with the serialization. Im not sure though. I have a pojo of class "Message" as payload. This is my config class:
package com.example.demo.config;
import com.example.demo.model.Message;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.internals.ConsumerFactory;
import reactor.kafka.receiver.internals.DefaultKafkaReceiver;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import javax.swing.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
private static final String TOPIC = "data-store";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String CLIENT_ID_CONFIG = "webflux-client";
private static final String GROUP_ID_CONFIG = "webflux-group";
@Bean
public KafkaReceiver kafkaReceiver(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_CONFIG);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Message.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaReceiver(ConsumerFactory.INSTANCE, ReceiverOptions.create(props).subscription(Collections.singleton(TOPIC)));
}
@Bean
public KafkaSender<String, Message> kafkaSender(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID_CONFIG);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Message.class.getName());
SenderOptions<String, Message> senderOptions = SenderOptions.create(props);
return KafkaSender.create(senderOptions);
}
}
This is my Producer controller :
package com.example.demo.controller;
import com.example.demo.model.Message;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@RestController
@RequiredArgsConstructor
public class KafkaProducerController {
private static final String TOPIC = "data-store";
private final KafkaSender<String, Message> sender;
@GetMapping("/kafka/stop-generation")
public void close(){
sender.close();
}
@GetMapping("/kafka/generate-messages")
public void generateMessages() throws InterruptedException {
sender.send(generateFlux()
.map(i -> SenderRecord.create(new ProducerRecord<>(TOPIC,"Key" + i,i), i)))
.doOnError(error -> System.out.println("Send failed" + error))
.subscribe(record -> {
System.out.println("::: " + record.recordMetadata().toString() + ":::" + record.correlationMetadata());
});
}
public Flux<Message> generateFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> generateMessage());
}
public Message generateMessage() {
double id = Math.random() * 100 ;
return new Message((int) id, "Message " + id, LocalDateTime.now().toString());
}
}
This is my consumer :
@RestController
public class KafkaReceiverController {
@Autowired
private KafkaReceiver<String,Message> kafkaReceiver;
@GetMapping(value = "/kafka/get-messages", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public void getMessageFlux() {
Flux<ReceiverRecord<String, Message>> kafkaFlux = kafkaReceiver.receive();
kafkaFlux
.doOnNext(record -> System.out.println(" received :: " + record.value()))
.subscribe();
}
}
My Message class :
package com.example.demo.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message implements Serializer, Deserializer {
private int id;
private String content;
private String timestamp;
@Override
public void configure(Map configs, boolean isKey) {
Serializer.super.configure(configs, isKey);
}
@Override
public byte[] serialize(String topic, Object data) {
return new byte[0];
}
@Override
public byte[] serialize(String topic, Headers headers, Object data) {
return Serializer.super.serialize(topic, headers, data);
}
@Override
public Object deserialize(String topic, byte[] data) {
return null;
}
@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
return Deserializer.super.deserialize(topic, headers, data);
}
@Override
public void close() {
Serializer.super.close();
}
}
EDIT 1: I overrode the deserialisation mechanism but it is failing with the below error:
@Override
public Message deserialize(String topic, byte[] data) {
ByteArrayInputStream byteStream = new ByteArrayInputStream(data);
ObjectInputStream objectStream = new ObjectInputStream(byteStream);
Object obj = objectStream.readObject();
objectStream.close();
return (Message) obj;
}
Error: IT fails at ObjecinputStream call
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.io.StreamCorruptedException: invalid stream header: 4D657373
Caused by: java.io.StreamCorruptedException: invalid stream header: 4D657373
at java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:963) ~[na:na]
at java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:397) ~[na:na]
at com.example.demo.model.Message.deserialize(Message.java:31) ~[main/:na]
at com.example.demo.model.Message.deserialize(Message.java:19) ~[main/:na]
I modified Message class to include serialisation and deserialisation:
@AllArgsConstructor
@NoArgsConstructor
public class Message implements Serializable, Serializer<Message>, Deserializer<Message> {
private int id;
private String content;
private String timestamp;
@SneakyThrows
@Override
public Message deserialize(String topic, byte[] data) {
ByteArrayInputStream byteStream = new ByteArrayInputStream(data);
ObjectInputStream objectStream = new ObjectInputStream(byteStream);
Object obj = objectStream.readObject();
objectStream.close();
return (Message) obj;
}
@Override
public Message deserialize(String topic, Headers headers, byte[] data) {
return Deserializer.super.deserialize(topic, headers, data);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Serializer.super.configure(configs, isKey);
}
@SneakyThrows
@Override
public byte[] serialize(String topic, Message data) {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
ObjectOutputStream objectStream = new ObjectOutputStream(byteStream);
objectStream.writeObject(data);
objectStream.close();
byte[] byteArray = byteStream.toByteArray();
byteStream.close();
return byteArray;
}
@Override
public byte[] serialize(String topic, Headers headers, Message data) {
return Serializer.super.serialize(topic, headers, data);
}
@Override
public void close() {
Serializer.super.close();
}
}
Now the error during deserialisation is :
Caused by: java.io.StreamCorruptedException: invalid stream header: 4D657373