1

I'm a newbie with Kafka I have created 2 projects, the first project as a Producer, and the second as a Consumer.

Note In both projects, if I create a Model or DTO under the same package (example: com.stackoverflow.model), it works, but it doesn't work if I created a model under a different package name as below:

  • The producer defines the package com.stackoverflow.model
  • The consumer defines the package com.stackoverflow.dto

Error logs:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition txn-hub-master-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.stackoverflow.dto.Demo' is not in the trusted packages: [java.util, java.lang, com.stackoverflow.mode, com.stackoverflow.mode.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.7.3.jar:2.7.3]
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.7.3.jar:2.7.3]

KafkaConsumerConfig Class

public Map<String, Object> propsStringObjectMap() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return props;
    }
@Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = this.propsStringObjectMap();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
  @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

KafkaProducerConfig Class

public Map<String, Object> propsStringObjectMap () {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
@Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(this.propsStringObjectMap());
    }
@Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
Theara
  • 79
  • 2
  • 12

0 Answers0