I want to implement Kafka Topic which sends and receives Serialized Java Objects based on this example.
I tried this:
Producer Config:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Object> requestFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> requestFactoryKafkaTemplate() {
return new KafkaTemplate<>(requestFactoryProducerFactory());
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.reply");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate(ProducerFactory<String, Object> producerFactory, ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
ConcurrentMessageListenerContainer<String, Object> kafkaMessageListenerContainer = factory.createContainer("tp-sale.reply");
ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
requestReplyKafkaTemplate.setDefaultTopic("tp-sale.reply");
return requestReplyKafkaTemplate;
}
}
Producer:
@RestController
@RequestMapping("/checkout")
public class CheckoutController {
private static final Logger LOG = LoggerFactory.getLogger(CheckoutController.class);
private KafkaTemplate<String, Object> requestFactoryKafkaTemplate;
private ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate;
@Autowired
public CheckoutController(KafkaTemplate<String, Object> requestFactoryKafkaTemplate,
ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate){
this.requestFactoryKafkaTemplate = requestFactoryKafkaTemplate;
this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
}
@PostMapping("sale_test")
public void performSaleTest() throws ExecutionException, InterruptedException, TimeoutException {
SaleRequestFactory obj = new SaleRequestFactory();
obj.setId(100);
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
SaleResponseFactory value = (SaleResponseFactory) consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
@PostMapping("authorize_test")
public void performAuthTest() throws ExecutionException, InterruptedException, TimeoutException {
AuthRequestFactory obj = new AuthRequestFactory();
obj.setId(140);
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
AuthResponseFactory value = (AuthResponseFactory) consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
}
ObjectFactoryDeserializer
public class ObjectFactoryDeserializer implements Deserializer<Object> {
@Override
public Object deserialize(String topic, byte[] data) {
return null;
}
@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
try (ObjectInputStream ois = new ObjectInputStream(bais)) {
return ois.readObject();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}
}
}
ObjectFactorySerializer
public class ObjectFactorySerializer implements Serializer<Object> {
@Override
public byte[] serialize(String topic, Object data) {
return null;
}
@Override
public byte[] serialize(String topic, Headers headers, Object data) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(data);
return baos.toByteArray();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Consumer configuration:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.request");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ProducerFactory<String, Object> saleResponseFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(saleResponseFactoryKafkaTemplate());
return factory;
}
@Bean
public KafkaTemplate<String, Object> saleResponseFactoryKafkaTemplate() {
return new KafkaTemplate<>(saleResponseFactoryProducerFactory());
}
}
Consumer
@Component
@KafkaListener(id = "tp-sale.request", topics = "tp-sale.request")
public class ConsumerListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListener.class);
@KafkaHandler
@SendTo("tp-sale.reply")
public AuthResponseFactory fooListener(AuthRequestFactory authRequestFactory) {
System.out.println("In AuthRequestFactoryListener: " + authRequestFactory);
AuthResponseFactory resObj = new AuthResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
@KafkaHandler
@SendTo("tp-sale.reply")
public SaleResponseFactory barListener(SaleRequestFactory saleRequestFactory) {
System.out.println("In SaleRequestFactoryListener: " + saleRequestFactory);
SaleResponseFactory resObj = new SaleResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
}
Full minimal working example
When I hit the endpoint authorize_test
the code is working fine.
When I hit the endpoint sale_test
I get this exception:
Producer exception:
14:06:48.706 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
14:06:48.706 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
14:06:48.707 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1372)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deseri^Calizer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
14:06:48.970 [http-nio-8090-exec-3] ERROR HandlerExecutionChain[triggerAfterCompletion:192] - HandlerInterceptor.afterCompletion threw exception
java.lang.NullPointerException: null
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
14:06:49.142 [http-nio-8090-exec-3] DEBUG DispatcherServlet[logResult:1101] - Failed to complete request: java.lang.InterruptedException
14:06:49.143 [http-nio-8090-exec-3] DEBUG HstsHeaderWriter[writeHeaders:169] - Not injecting HSTS header since it did not match the requestMatcher
14:06:49.149 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
14:06:49.149 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1372)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.150 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
14:06:49.151 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.152 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1372)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.157 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
Full log https://pastebin.com/Z5XJCNhA
Do you know where I'm wrong? I can't find my mistake. Looks like requestReplyKafkaTemplate
is not configured properly.