45

In my Spring Boot/Kafka application before the library update, I used the following class org.telegram.telegrambots.api.objects.Update in order to post messages to the Kafka topic. Right now I use the following org.telegram.telegrambots.meta.api.objects.Update. As you may see - they have different packages.

After application restart I ran into the following issue:

[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:221) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.0.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

This is my config:

@EnableAsync
@Configuration
public class ApplicationConfig {

    @Bean
    public StringJsonMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

}

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);

        return props;
    }

    @Bean
    public ProducerFactory<String, Update> updateProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Update> updateKafkaTemplate() {
        return new KafkaTemplate<>(updateProducerFactory());
    }

}

@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.max.poll.interval.ms}")
    private String kafkaConsumerMaxPollIntervalMs;

    @Value("${kafka.consumer.max.poll.records}")
    private String kafkaConsumerMaxPollRecords;

    @Value("${kafka.topic.telegram.fenix.bot.update.consumer.concurrency}")
    private Integer updateConsumerConcurrency;

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(consumerFactory(kafkaProperties));

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Update> updateConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Update.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Update> updateKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, Update> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(updateConsumerFactory(kafkaProperties));
        factory.setConcurrency(updateConsumerConcurrency);

        return factory;
    }

}

application.properties

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

How to solve this issue and let Kafka deserialize old messages into the new ones ?

UPDATED

This is my listener

@Component
public class UpdateConsumer {

    @KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
    public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {

        //do some logic here

        ack.acknowledge();
    }

}
alexanoid
  • 24,051
  • 54
  • 210
  • 410

11 Answers11

39

See the documentation.

Starting with version 2.1, type information can be conveyed in record Headers, allowing the handling of multiple types. In addition, the serializer/deserializer can be configured using Kafka properties.

JsonSerializer.ADD_TYPE_INFO_HEADERS (default true); set to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).

JsonDeserializer.KEY_DEFAULT_TYPE; fallback type for deserialization of keys if no header information is present.

JsonDeserializer.VALUE_DEFAULT_TYPE; fallback type for deserialization of values if no header information is present.

JsonDeserializer.TRUSTED_PACKAGES (default java.util, java.lang); comma-delimited list of package patterns allowed for deserialization; * means deserialize all.

By default, the serializer will add type information to the headers.

see the boot documentation.

Similarly, you can disable the JsonSerializer default behavior of sending type information in headers:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false

Or you can add type mapping to the inbound message converter to map the source type to the destination type.

EDIT

Having said that, what version are you using?

Noam Yizraeli
  • 4,446
  • 18
  • 35
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • 1
    Thanks for your answer! I have added the `@KafkaListener` implementation. I use Kafka 1.1.0 – alexanoid Aug 05 '18 at 18:23
  • 2
    I meant which version of spring-kafka. You will need to do one of the suggestions I made - suppress the type headers from the sender or add mapping to the type mapper in the deserializer. The type headers take precedence over the target type passed into the deserializer's constructor. `new JsonDeserializer<>(Update.class)`. We should probably add a boolean to the deserializer to allow the supplied type to be used even if headers exist. – Gary Russell Aug 05 '18 at 19:05
  • Thanks! I use Spring Kafka 2.1.8.RELEASE managed by Spring Boot 2.0.4.RELEASE. Also, I have added `spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer` after I faced the issue described in my question.. this is my unsuccessful attempt to resolve this issue by myself – alexanoid Aug 05 '18 at 19:09
  • 2
    No; you need `spring.kafka.producer.properties.spring.json.add.type.headers=false` on the producer side - but you will need type mapping on the consumer side to read any existing messages that already have headers (unless you can consume them with your old app version). See `setTypeMapper` on the deserializer and `setIdClassMapping()` on the `DefaultJackson2JavaTypeMapper`. You will need to map the source class name to the destination class. – Gary Russell Aug 05 '18 at 19:16
  • 1
    See the link I pointed you to (Boot Kafka chapter). This is not a standard boot property. It does appear in the appendix: `spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client.` – Gary Russell Aug 05 '18 at 19:22
  • 2
    I opened [an issue for this problem](https://github.com/spring-projects/spring-kafka/issues/768). – Gary Russell Aug 05 '18 at 19:23
  • Thansk bro, I added spring.kafka.producer.properties.spring.json.add.type.headers=false to my properties file and It worked. – Semih Erkaraca Dec 04 '22 at 00:54
25

For this one there are two ways of doing it, either in your deserializer or in your application.yml.

Either in your deserializer

In your deserializer, that you use within your DefaultKafkaConsumerFactory (to create your consumer factory). Let's say you want to make a ConsumerFactory<String, Foo> with Foo being the model/POJO you want to use for your kafka messages.

You need to addTrustedPackages from JsonDeserializer I have an example in Kotlin, but it's almost the same syntax in java:

 val deserializer = JsonDeserializer<Foo>()
 deserializer.addTrustedPackages("com.example.entity.Foo") // Adding Foo to our trusted packages

 val consumerFactory = DefaultKafkaConsumerFactory(
      consumerConfigs(),  // your consumer config
      StringDeserializer(), 
      deserializer // Using our newly created deserializer
 )

Or in your application.yml

In your application.yml file following spring-kafka instructions. We add the Foo class from com.example.entity.Foo package in the trusted store using:

spring:
  kafka:
    consumer:
      properties:
        spring.json.trusted.packages: "com.example.entity.Foo"

With spring.json.trusted.packages accepting an array of packages. You can specify a class package, or use * for any packages. In that case you don't need to pass your deserializer in DefaultKafkaConsumerFactory() only in the consumer config.

Sylhare
  • 5,907
  • 8
  • 64
  • 80
21

There are two key points should be mentioned.

  1. There are two separated project for Producer and Consumer.
  2. Then sending message(value) is an Object type rather primitive type.

The problem is that the producing message object is not available in consumer side because those are two separate projects.

Two overcome this issue please follow below mention steps in Spring boot Producer and Consumer applications.

----Producer App -------------

** Producer Configuration Class **

import com.kafka.producer.models.Container;    
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, Container> producerFactory(){

    Map<String, Object> config = new HashMap<>();

config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new DefaultKafkaProducerFactory(config);
}

@Bean
public KafkaTemplate<String, Container> kafkaTemplate(){
    return new KafkaTemplate<>(producerFactory());
}
}

Note : Container is the custom Object to be posted in a kafka topic.


** Producer Class **

import com.kafka.producer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class Producer {

private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "final-topic";

@Autowired
private KafkaTemplate<String, Container> kafkaTemplate;

public void sendUserMessage(Container msg) {
    LOGGER.info(String.format("\n ===== Producing message in JSON ===== \n"+msg));
    Message<Container> message = MessageBuilder
            .withPayload(msg)
            .setHeader(KafkaHeaders.TOPIC, TOPIC)
            .build();
    this.kafkaTemplate.send(message);
}
}

** Producer Controller **

import com.kafka.producer.models.Container;
import com.kafka.producer.services.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/message")
public class MessageController {

@Autowired
private Producer producer;

@PostMapping(value = "/publish")
public String sendMessageToKafkaTopic(@RequestBody Container containerMsg) {
    this.producer.sendUserMessage(containerMsg);
    return "Successfully Published !!";
}
}

Note: The message with type Container will be published to the kafka topic name :final-topic as JSON message.

===============================================================================

-- Consumer App --

** Configuration Class **

import com.kafka.consumer.models.Container;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerOneConfig {

@Bean
public ConsumerFactory<String, Container> consumerFactory(){
    JsonDeserializer<Container> deserializer = new JsonDeserializer<>(Container.class);
    deserializer.setRemoveTypeHeaders(false);
    deserializer.addTrustedPackages("*");
    deserializer.setUseTypeMapperForKey(true);

    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_one");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Container> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, Container> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;

}
}

Note: Here you can see, instead of using default JsonDeserializer() we have to use custom JsonDeserializer to consume Container object type Json Messages from final-topic(topic name).


** Consumer Service **

import com.kafka.consumer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.io.IOException;

@Service
public class ConsumerOne {

private final Logger LOGGER = LoggerFactory.getLogger(ConsumerOne.class);

@KafkaListener(topics = "final-topic", groupId = "group_one", containerFactory = "kafkaListenerContainerFactory")
public void consumeUserMessage(@Payload Container msg, @Headers MessageHeaders headers) throws IOException {
    System.out.println("received data in Consumer One ="+ msg.getMessageTypes());
}
}
Iroshan
  • 349
  • 2
  • 4
  • 4
    Error `Invalid value org.springframework.kafka.support.serializer.JsonDeserializer@279489ac for configuration value.deserializer: Expected a Class instance or class name.` when it reaches here: `config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);` Seems I have to use the factory instead of the instance. – WesternGun Jun 08 '20 at 10:00
12

I have also faced this issue, however solutions above did not work for me. What did the trick, though, was configuring kafka consumer factory as follows:

props.put(JsonDeserializer.TRUSTED_PACKAGES, "your.package.name");
double-beep
  • 5,031
  • 17
  • 33
  • 41
mareck_ste
  • 517
  • 1
  • 6
  • 17
11
jsonDeserializer.addTrustedPackages("*");

solved my issue for spring-kafka-2.2.8.


To add it in application.properties:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

IMPORTANT NOTE:

They have no effect if you have provided Serializer and Deserializer instances for KafkaConsumer and KafkaProducer, respectively.

References:
[1] https://docs.spring.io/spring-kafka/reference/html/#json-serde
[2] https://github.com/spring-projects/spring-kafka/issues/535

jumping_monkey
  • 5,941
  • 2
  • 43
  • 58
4

i had a similar problem when i wanted to consume messages in a consumer app I getting 2 errors :

1-The class 'someClass' is not in the trusted packages: [java.util, java.lang,If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*)

2-org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition partion at offset 9902. If needed, please seek past the record to continue consumption.

I could solve that with add this properties (JsonDeserializer.TRUSTED_PACKAGE) into kafka consumer config generator method(makeConfig) at the KafkaConfig class for consuming configs with this approach my problem has been solved:

private Map<String, Object> makeConfig(ServiceMessagePriority input)
{
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
    configProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    configProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.core.model.ServiceMsgDTO");
    configProps.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,false);
    configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return configProps;
}
mi_mo
  • 135
  • 1
  • 8
4

For those facing this when working with streams, you have to specify trusted packages for both consumer and streams, as Kafka initializes 2 different deserializers

spring:
  kafka:
    bootstrap-servers: localhost:9092
  producer:
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

  consumer:
...
    properties:
       spring.json.trusted.packages:  "YOUR ENTTIES PACKAGE"
  streams:
...
    properties:
      spring.json.trusted.packages: "YOUR ENTTIES PACKAGE"
alban maillere
  • 1,038
  • 10
  • 7
  • I've tried this solution but it does not work. Do you have a success story with it? – Oleksii Mar 26 '22 at 08:51
  • I have. Just make sure both consumer & streams dependencies are below spring-kafka dependency (not spring-cloud-stream-kafka or anything alike) and that the class is in the included package. – Guy_g23 Apr 18 '22 at 14:33
3

There're two ways to solve the problem:

  1. Disable type headers in the producer
  2. Add a list of trusted producers in the consumer

Disable type headers in the producer

Your producer properties files should look like following,

spring:
  profiles: dev
  kafka:
    producer:
      bootstrap-servers: localhost:9092, localhost:9093
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        acks: 1
        spring:
          json:
            add:
              type:
                headers: false

And your consumer properties files should be as follows,

spring:
  profiles: dev
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: consumer-group-1
      properties:
        spring:
          json:
            value:
              default:
                type: 'com.kafka.consumer.Message'

Add a list of trusted producers in the consumer

If you don't want to change anything in the producer properties file, then update your consumer properties file as follows,

spring:
  profiles: dev
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: consumer-group-1
      properties:
        spring:
          json:
            value:
              default:
                type: 'com.kafka.consumer.Message'
            type:
              mapping: 'com.kafka.producer.Message:com.kafka.consumer.Message'
            trusted:
              packages: 'com.kafka.producer'

In my Message class, make sure you have the setters and getters along with NoArgsConstructor, otherwise, it won't work.

@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public
class Message {

    Integer id;

    String name;

    String address;

    String phone;

    boolean isActive;
}

Consumer Listener:

@Component
@Slf4j
public class KafkaMessageListener {

    @KafkaListener(topics = {"test_json"})
    public void OnMessage(Message rc){
        log.info(rc.getName());
    }
}

Note: My Producer and Consumers are two different projects and their package names as these,

Producer: com.kafka.producer.Message

Consumer: com.kafka.consumer.Message

Awn Ali
  • 1,361
  • 1
  • 17
  • 31
1

My version of spring-kafka is 2.2.11 and also I had this error.

I got this error because I had configurated two consumers in the same kafta topic with diferent configuration. One of them had ConsumerFactory<String, OrderDTO> and other had ConsumerFactory<String, String>.

I solved the error changing the configuration of one Consumer because was wrong.

Simply you check the consumers of the topic

davidleongz
  • 155
  • 2
  • 11
1

I had the same problem and it turned out I had one too many indentations in my application.yml file. I am completely new to Springboot.

Before:

  port: 5000

spring:
  data:
    mongodb:
      host: localhost
      port: 27017
      database: bankaccount
    kafka:
      producer:
        bootstrap-servers: localhost:9092
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

After:

  port: 5000

spring:
  data:
    mongodb:
      host: localhost
      port: 27017
      database: bankaccount
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Rami
  • 490
  • 7
  • 22
0

Use same package name for both consumer and producer , It solved my error

  • 1
    Your answer could be improved with additional supporting information. Please [edit] to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community May 15 '22 at 23:35