0

I want to implement Kafka Topic which sends and receives Serialized Java Objects based on this example.

I tried this:

Producer Config:

    @Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, Object> requestFactoryProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> requestFactoryKafkaTemplate() {
        return new KafkaTemplate<>(requestFactoryProducerFactory());
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.reply");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate(ProducerFactory<String, Object> producerFactory, ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
        ConcurrentMessageListenerContainer<String, Object> kafkaMessageListenerContainer = factory.createContainer("tp-sale.reply");
        ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
        requestReplyKafkaTemplate.setDefaultTopic("tp-sale.reply");
        return requestReplyKafkaTemplate;
    }
}

Producer:

@RestController
@RequestMapping("/checkout")
public class CheckoutController {

    private static final Logger LOG = LoggerFactory.getLogger(CheckoutController.class);

    private KafkaTemplate<String, Object> requestFactoryKafkaTemplate;
    private ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate;

    @Autowired
    public CheckoutController(KafkaTemplate<String, Object> requestFactoryKafkaTemplate,
                              ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate){
        this.requestFactoryKafkaTemplate = requestFactoryKafkaTemplate;
        this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
    }

    @PostMapping("sale_test")
    public void performSaleTest() throws ExecutionException, InterruptedException, TimeoutException {

        SaleRequestFactory obj = new SaleRequestFactory();
        obj.setId(100);

        ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
        RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
        SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);


        SaleResponseFactory value = (SaleResponseFactory) consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());


    }

    @PostMapping("authorize_test")
    public void performAuthTest() throws ExecutionException, InterruptedException, TimeoutException {

        AuthRequestFactory obj = new AuthRequestFactory();
        obj.setId(140);

        ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
        RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
        SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);


        AuthResponseFactory value = (AuthResponseFactory) consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());


    }
}

ObjectFactoryDeserializer

    public class ObjectFactoryDeserializer implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] data) {
        return null;
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data) {
        ByteArrayInputStream bais = new ByteArrayInputStream(data);
        try (ObjectInputStream ois = new ObjectInputStream(bais)) {
            return ois.readObject();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        catch (ClassNotFoundException e) {
            throw new IllegalStateException(e);
        }
    }

}

ObjectFactorySerializer

public class ObjectFactorySerializer implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        return null;
    }

    @Override
    public byte[] serialize(String topic, Headers headers, Object data) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
            oos.writeObject(data);
            return baos.toByteArray();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

}

Consumer configuration:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.request");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ProducerFactory<String, Object> saleResponseFactoryProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(saleResponseFactoryKafkaTemplate());
        return factory;
    }

    @Bean
    public KafkaTemplate<String, Object> saleResponseFactoryKafkaTemplate() {
        return new KafkaTemplate<>(saleResponseFactoryProducerFactory());
    }

}

Consumer

@Component
@KafkaListener(id = "tp-sale.request", topics = "tp-sale.request")
public class ConsumerListener {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListener.class);

    @KafkaHandler
    @SendTo("tp-sale.reply")
    public AuthResponseFactory fooListener(AuthRequestFactory authRequestFactory) {
        System.out.println("In AuthRequestFactoryListener: " + authRequestFactory);

        AuthResponseFactory resObj = new AuthResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }

    @KafkaHandler
    @SendTo("tp-sale.reply")
    public SaleResponseFactory barListener(SaleRequestFactory saleRequestFactory) {
        System.out.println("In SaleRequestFactoryListener: " + saleRequestFactory);

        SaleResponseFactory resObj = new SaleResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }
}

Full minimal working example

When I hit the endpoint authorize_test the code is working fine.

When I hit the endpoint sale_test I get this exception:

Producer exception:

14:06:48.706 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
14:06:48.706 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
14:06:48.707 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
        at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
        at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1372)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deseri^Calizer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
14:06:48.970 [http-nio-8090-exec-3] ERROR HandlerExecutionChain[triggerAfterCompletion:192] - HandlerInterceptor.afterCompletion threw exception
java.lang.NullPointerException: null
   org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
14:06:49.142 [http-nio-8090-exec-3] DEBUG DispatcherServlet[logResult:1101] - Failed to complete request: java.lang.InterruptedException
14:06:49.143 [http-nio-8090-exec-3] DEBUG HstsHeaderWriter[writeHeaders:169] - Not injecting HSTS header since it did not match the requestMatcher
14:06:49.149 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
14:06:49.149 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
        at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
        at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1372)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.150 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
14:06:49.151 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.152 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
        at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
        at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1372)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.157 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}

 

Full log https://pastebin.com/Z5XJCNhA

Do you know where I'm wrong? I can't find my mistake. Looks like requestReplyKafkaTemplate is not configured properly.

Peter Penzov
  • 1,126
  • 134
  • 430
  • 808
  • According to the stacktrace, you still are using a ResponseFactoryDeserializer, not ObjectFactoryDeserializer – OneCricketeer Jan 30 '21 at 15:01
  • 1
    I redeployed the code. Looks Like I made a test with old code. Now works fine. One additional question: How I can improve the code? For example now if I get serialization exception I get infinite attempts. Can I limit them to just 3 attempts? – Peter Penzov Jan 30 '21 at 15:27
  • So, it works now? You can add `ProducerConfig.RETRIES`. As far as improving, like I said before, putting any producer configs in a class named "KafkaConsumerConfig" is super confusing. You can put `@Bean` in any class, so you might as well just make a single "KafkaClientConfig" class with both producer and consumer factories / templates – OneCricketeer Jan 30 '21 at 15:31
  • 1
    Yes, it's working. Can you show me how to restructure the code, please? You can download the project, make a edit and push into separate branch the improved code. – Peter Penzov Jan 30 '21 at 15:37
  • For example, `requestFactoryProducerFactory()` and `saleResponseFactoryProducerFactory()` are now the exact same. You don't need two methods when one will be fine. Same for the template methods that use those. So I'm saying just combine those two config classes – OneCricketeer Jan 30 '21 at 16:58

0 Answers0