0

Received messages are showing payload as null.

Publishing and consuming is working. I can see the logs printing getting triggered whenever there is a publish and consumption. Messages are produced every second.

I think the issue might be with the serialization. Im not sure though. I have a pojo of class "Message" as payload. This is my config class:

package com.example.demo.config;

import com.example.demo.model.Message;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.internals.ConsumerFactory;
import reactor.kafka.receiver.internals.DefaultKafkaReceiver;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

import javax.swing.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfiguration {

    private static final String TOPIC = "data-store";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String CLIENT_ID_CONFIG = "webflux-client";
    private static final String GROUP_ID_CONFIG = "webflux-group";

    @Bean
    public KafkaReceiver kafkaReceiver(){

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_CONFIG);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Message.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        return new DefaultKafkaReceiver(ConsumerFactory.INSTANCE, ReceiverOptions.create(props).subscription(Collections.singleton(TOPIC)));
    }

    @Bean
    public KafkaSender<String, Message> kafkaSender(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID_CONFIG);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Message.class.getName());

        SenderOptions<String, Message> senderOptions = SenderOptions.create(props);
        return KafkaSender.create(senderOptions);

    }
}


This is my Producer controller :

package com.example.demo.controller;

import com.example.demo.model.Message;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@RestController
@RequiredArgsConstructor
public class KafkaProducerController {

    private static final String TOPIC = "data-store";

    private final KafkaSender<String, Message> sender;

    @GetMapping("/kafka/stop-generation")
    public void close(){
        sender.close();
    }

    @GetMapping("/kafka/generate-messages")
    public void generateMessages() throws InterruptedException {
        sender.send(generateFlux()
                .map(i -> SenderRecord.create(new ProducerRecord<>(TOPIC,"Key" + i,i), i)))
                .doOnError(error -> System.out.println("Send failed" + error))
                .subscribe(record -> {
                    System.out.println("::: " + record.recordMetadata().toString() + ":::" + record.correlationMetadata());
                   
                });
        }

    public Flux<Message> generateFlux() {
        return Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> generateMessage());
    }

    public Message generateMessage() {
        double id = Math.random() * 100 ;
        return new Message((int) id, "Message " + id, LocalDateTime.now().toString());
    }

}

This is my consumer :


@RestController
public class KafkaReceiverController {

    @Autowired
    private KafkaReceiver<String,Message> kafkaReceiver;

    @GetMapping(value = "/kafka/get-messages", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public void getMessageFlux() {
        Flux<ReceiverRecord<String, Message>> kafkaFlux = kafkaReceiver.receive();

         kafkaFlux
                 .doOnNext(record -> System.out.println(" received :: " + record.value()))
                 .subscribe();


    }
}

My Message class :

package com.example.demo.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message implements Serializer, Deserializer {

    private int id;
    private String content;
    private String timestamp;

    @Override
    public void configure(Map configs, boolean isKey) {
        Serializer.super.configure(configs, isKey);
    }

    @Override
    public byte[] serialize(String topic, Object data) {
        return new byte[0];
    }

    @Override
    public byte[] serialize(String topic, Headers headers, Object data) {
        return Serializer.super.serialize(topic, headers, data);
    }

    @Override
    public Object deserialize(String topic, byte[] data) {
        return null;
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data) {
        return Deserializer.super.deserialize(topic, headers, data);
    }

    @Override
    public void close() {
        Serializer.super.close();
    }
}

EDIT 1: I overrode the deserialisation mechanism but it is failing with the below error:

@Override
    public Message deserialize(String topic, byte[] data) {
        ByteArrayInputStream byteStream = new ByteArrayInputStream(data);
        ObjectInputStream objectStream = new ObjectInputStream(byteStream);
        Object obj = objectStream.readObject();
        objectStream.close();
        return (Message) obj;
    }

Error: IT fails at ObjecinputStream call


reactor.core.Exceptions$ErrorCallbackNotImplemented: java.io.StreamCorruptedException: invalid stream header: 4D657373
Caused by: java.io.StreamCorruptedException: invalid stream header: 4D657373
    at java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:963) ~[na:na]
    at java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:397) ~[na:na]
    at com.example.demo.model.Message.deserialize(Message.java:31) ~[main/:na]
    at com.example.demo.model.Message.deserialize(Message.java:19) ~[main/:na]

I modified Message class to include serialisation and deserialisation:

@AllArgsConstructor
@NoArgsConstructor
public class Message implements Serializable, Serializer<Message>, Deserializer<Message> {

    private int id;
    private String content;
    private String timestamp;

    @SneakyThrows
    @Override
    public Message deserialize(String topic, byte[] data) {
        ByteArrayInputStream byteStream = new ByteArrayInputStream(data);
        ObjectInputStream objectStream = new ObjectInputStream(byteStream);
        Object obj = objectStream.readObject();
        objectStream.close();
        return (Message) obj;
    }

    @Override
    public Message deserialize(String topic, Headers headers, byte[] data) {
        return Deserializer.super.deserialize(topic, headers, data);
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Serializer.super.configure(configs, isKey);
    }

    @SneakyThrows
    @Override
    public byte[] serialize(String topic, Message data) {
        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
        ObjectOutputStream objectStream = new ObjectOutputStream(byteStream);
        objectStream.writeObject(data);
        objectStream.close();
        byte[] byteArray = byteStream.toByteArray();
        byteStream.close();
        return byteArray;
    }

    @Override
    public byte[] serialize(String topic, Headers headers, Message data) {
        return Serializer.super.serialize(topic, headers, data);
    }

    @Override
    public void close() {
        Serializer.super.close();
    }
}

Now the error during deserialisation is :

Caused by: java.io.StreamCorruptedException: invalid stream header: 4D657373

user1354825
  • 1,108
  • 4
  • 21
  • 54

1 Answers1

0

You are right, you are manually returning null from you deserializer, which is Message class:

@Override
public Object deserialize(String topic, byte[] data) {
    return null;
}

@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
    return Deserializer.super.deserialize(topic, headers, data);
}

Implement actual deserialization instead of returning null here. Second method calls Deserializer.super.deserialize which forwards to the first one according to its implementation:

default T deserialize(String topic, Headers headers, byte[] data) {
    return this.deserialize(topic, data);
}

so you get null in any case. Common practice is to override only deserialize(String topic, byte[] data) if you don't need headers. deserialize(String topic, Headers headers, byte[] data) would use it by default as shown above.

gdomo
  • 1,650
  • 1
  • 9
  • 18
  • Thanks. I added my own serializer and deserilalizer logic. Serialization seems to be working but deserialisation is failing. – user1354825 Jun 22 '23 at 10:21
  • I modified message class as well. – user1354825 Jun 22 '23 at 10:21
  • Looks like your object is not properly serialized. Please provide serialization code – gdomo Jun 22 '23 at 13:00
  • i added serialisation and deserialisation code in message class but now it fails with a different error as updated in the question – user1354825 Jun 22 '23 at 13:17
  • 1) include [serialVersionUuid](https://stackoverflow.com/questions/285793/what-is-a-serialversionuid-and-why-should-i-use-it), it's always a good practice 2) purge/recreate topic, to make sure you are trying to deserialize only newly created messages – gdomo Jun 22 '23 at 13:27