I'm a newbie with Kafka I have created 2 projects, the first project as a Producer, and the second as a Consumer.
Note In both projects, if I create a Model or DTO under the same package (example: com.stackoverflow.model), it works, but it doesn't work if I created a model under a different package name as below:
- The producer defines the package com.stackoverflow.model
- The consumer defines the package com.stackoverflow.dto
Error logs:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition txn-hub-master-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.stackoverflow.dto.Demo' is not in the trusted packages: [java.util, java.lang, com.stackoverflow.mode, com.stackoverflow.mode.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.7.3.jar:2.7.3]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.7.3.jar:2.7.3]
KafkaConsumerConfig Class
public Map<String, Object> propsStringObjectMap() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = this.propsStringObjectMap();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
KafkaProducerConfig Class
public Map<String, Object> propsStringObjectMap () {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(this.propsStringObjectMap());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}