0

My application is a Kafka consumer which receives a big fat custom message from the producer. We use Jackson to serialize and deserialize the messages.

A dummy of my consumer is here.

    public class LittleCuteConsumer {

        @KafkaListener(topics = "${kafka.bigfat.topic}", containerFactory = “littleCuteConsumerFactory")
       public void receive(BigFatMessage message) {

        // do cute stuff

        }
}

And the message that's been transferred

    @JsonIgnoreProperties(ignoreUnknown = true)
        public class BigFatMessage {
           private String fieldOne;
           private String fieldTwo;
           ...
           private String fieldTen;

           private CustomeFieldOne cf1;
           ...
           private CustomeFieldTen cf10; 
           // setters and getters
        }

Here is the object I want to deserialize the original message to.

    @JsonIgnoreProperties(ignoreUnknown = true)
    public class ThinMessage {
       private String fieldOne;
       private String fieldTwo;

      // setters and getters
    }

Original deserializer

public class BigFatDeserializer implements Deserializer<BigFatMessage> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Default implementation of configure method
    }

    @Override
    public BigFatMessage deserialize(String topic, byte[] data) {
        ObjectMapper mapper = new ObjectMapper();
        BigFatMessage biggie = null;
        try {
            biggie = mapper.readValue(data, BigFatMessage.class);
        } catch (Exception e) {
             // blame others
         }
        return biggie;
    }

    @Override
    public void close() {
        // Default implementation of close method
    }
}

As we can see here, the message contains a lot of fields and dependent objects which are actually useless for my consumer, and I don't want to define all the dependent classes in my consumer as well.

Hence, I need a way I to receive the message using a simple different model class and deserialize it to ignore the unnecessary fields from the original message!

How I'm trying to deserialize

public class ThinDeserializer implements Deserializer<ThinMessage> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Default implementation of configure method
    }

    @Override
    public ThinMessage deserialize(String topic, byte[] data) {
            ObjectMapper mapper = new ObjectMapper();
            ThinMessage cutie = null;
            try {
                cutie = mapper.readValue(data, ThinMessage.class);
            } catch (Exception e) {
                 // blame others
             }
            return cutie;
        }

        @Override
        public void close() {
            // Default implementation of close method
        }
    }

And get the below Jackson error:

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of com.myapp.ThinMessage (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)\n

Accompanied by below Kafka exception.

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message\n

org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 
Aryan Venkat
  • 679
  • 1
  • 10
  • 34
  • On the wire it is just JSON. You can deserialize it however you want. Just need to configure the parser to ignore unmapped fields (which they normally automatically do) – Thilo Aug 29 '19 at 11:10
  • 1
    Possible duplicate of [Ignoring new fields on JSON objects using Jackson](https://stackoverflow.com/questions/5455014/ignoring-new-fields-on-json-objects-using-jackson) – daniu Aug 29 '19 at 11:28
  • 1
    @daniu `@JsonIgnoreProperties(ignoreUnknown = true)` didn't work for me. – Aryan Venkat Aug 29 '19 at 11:35
  • Which json library are you using? each library may have its own annotation similar to @JsonIgnoreProperties(ignoreUnknown = true) –  Aug 29 '19 at 11:56
  • @lssilva . I'm using the same Jackson library for the purpose. When I try to deserialize the original message object into my smaller message, I get an exception. Well, I'll edit the question now. – Aryan Venkat Aug 29 '19 at 12:08
  • Is your ThinMessage class a java bean? with empty constructore and getters and setters? –  Aug 29 '19 at 13:18
  • @lssilva Correct! – Aryan Venkat Aug 29 '19 at 13:19

2 Answers2

0

Try to change

public class ThinMessage {
   private String fieldOne;
   private String fieldTwo;
}

to

@JsonIgnoreProperties(ignoreUnknown = true)    
public class ThinMessage {
            private String fieldOne;
            private String fieldTwo;

            public ThinMessage() {
            }

            public String getFieldOne() {
                return fieldOne;
            }

            public void setFieldOne(String fieldOne) {
                this.fieldOne = fieldOne;
            }

            public String getFieldTwo() {
                return fieldTwo;
            }

            public void setFieldTwo(String fieldTwo) {
                this.fieldTwo = fieldTwo;
            }
        }

and set

ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
  • I have the same model, sorry if I didn't make that clear. Updated the question. – Aryan Venkat Aug 29 '19 at 13:22
  • Do you have also? objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); –  Aug 29 '19 at 13:27
  • I have tried, adding this, still getting the same error. There is another exception accompanying the first, editing the question. – Aryan Venkat Aug 29 '19 at 14:24
  • On your kafka consumer did you set the deserializers? (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); –  Aug 30 '19 at 07:11
0

check this link : (https://docs.spring.io/spring-kafka/docs/2.3.x/reference/html/#json)

you have two options : remove typeInfo from producer or ingnore typeInfo from consumer

@Bean
public DefaultKafkaProducerFactory pf(KafkaProperties properties) {
    Map<String, Object> props = properties.buildProducerProperties();
    DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(props,
            new JsonSerializer<>(MyKeyType.class)
                    .forKeys()
                    .noTypeInfo(),
            new JsonSerializer<>(MyValueType.class)
                    .noTypeInfo());
}

@Bean
public DefaultKafkaConsumerFactory pf(KafkaProperties properties) {
    Map<String, Object> props = properties.buildConsumerProperties();
    DefaultKafkaConsumerFactory pf = new DefaultKafkaConsumerFactory(props,
            new JsonDeserializer<>(MyKeyType.class)
                    .forKeys()
                    .ignoreTypeHeaders(),
            new JsonSerializer<>(MyValueType.class)
                    .ignoreTypeHeaders());
}