I want to implement Kafka Topic which sends and receives Serialized Java Objects based on this example.
I tried this:
Producer Config:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Object> requestFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RequestFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> requestFactoryKafkaTemplate() {
return new KafkaTemplate<>(requestFactoryProducerFactory());
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.reply");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ResponseFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate(ProducerFactory<String, Object> producerFactory, ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
ConcurrentMessageListenerContainer<String, Object> kafkaMessageListenerContainer = factory.createContainer("tp-sale.reply");
ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
requestReplyKafkaTemplate.setDefaultTopic("tp-sale.reply");
return requestReplyKafkaTemplate;
}
}
Producer:
private KafkaTemplate<String, Object> requestFactoryKafkaTemplate;
private ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate;
@Autowired
public CheckoutController(KafkaTemplate<String, Object> requestFactoryKafkaTemplate,
ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate){
this.requestFactoryKafkaTemplate = requestFactoryKafkaTemplate;
this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
}
@PostMapping("sale_test")
public void performSaleTest() throws ExecutionException, InterruptedException, TimeoutException {
SaleRequestFactory obj = new SaleRequestFactory();
obj.setId(100);
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
SaleResponseFactory value = (SaleResponseFactory) consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
@PostMapping("authorize_test")
public void performAuthTest() throws ExecutionException, InterruptedException, TimeoutException {
AuthRequestFactory obj = new AuthRequestFactory();
obj.setId(140);
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
AuthResponseFactory value = (AuthResponseFactory) consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
RequestFactoryDeserializer
public class RequestFactoryDeserializer implements Serializable, Deserializer<Object> {
@Override
public Object deserialize(String topic, byte[] data)
{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
try
{
ObjectInputStream in = new ObjectInputStream(bis);
in.close();
return in.readObject();
}
catch (IOException | ClassNotFoundException e)
{
throw new RuntimeException("Unhandled", e);
}
}
}
RequestFactorySerializer
public class RequestFactorySerializer implements Serializable, Serializer<Object> {
@Override
public byte[] serialize(String topic, Object data)
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
try
{
ObjectOutputStream outputStream = new ObjectOutputStream(out);
outputStream.writeObject(data);
out.close();
return out.toByteArray();
}
catch (IOException e)
{
throw new RuntimeException("Unhandled", e);
}
}
}
ResponseFactoryDeserializer
public class ResponseFactoryDeserializer implements Serializable, Deserializer<AuthResponseFactory> {
@Override
public AuthResponseFactory deserialize(String topic, byte[] data)
{
AuthResponseFactory authResponseFactory = null;
try
{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream in = new ObjectInputStream(bis);
authResponseFactory = (AuthResponseFactory) in.readObject();
in.close();
}
catch (IOException | ClassNotFoundException e)
{
throw new RuntimeException("Unhandled", e);
}
return authResponseFactory;
}
}
ResponseFactorySerializer
public class ResponseFactorySerializer implements Serializable, Serializer<AuthResponseFactory> {
@Override
public byte[] serialize(String topic, AuthResponseFactory data)
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
try
{
ObjectOutputStream outputStream = new ObjectOutputStream(out);
outputStream.writeObject(data);
out.close();
}
catch (IOException e)
{
throw new RuntimeException("Unhandled", e);
}
return out.toByteArray();
}
}
Consumer configuration:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.request");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, RequestFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ProducerFactory<String, Object> saleResponseFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ResponseFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(saleResponseFactoryKafkaTemplate());
return factory;
}
@Bean
public KafkaTemplate<String, Object> saleResponseFactoryKafkaTemplate() {
return new KafkaTemplate<>(saleResponseFactoryProducerFactory());
}
}
Consumer
@Component
@KafkaListener(id = "so65866763", topics = "tp-sale.request")
public class ConsumerListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListener.class);
@KafkaHandler
@SendTo("tp-sale.reply")
public AuthResponseFactory fooListener(AuthRequestFactory authRequestFactory) {
System.out.println("In AuthRequestFactoryListener: " + authRequestFactory);
AuthResponseFactory resObj = new AuthResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
@KafkaHandler
@SendTo("tp-sale.reply")
public SaleResponseFactory barListener(SaleRequestFactory saleRequestFactory) {
System.out.println("In SaleRequestFactoryListener: " + saleRequestFactory);
SaleResponseFactory resObj = new SaleResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
}
Full minimal working example
When I run the code I get this exception:
Consumer exception:
21:38:18.872 [http-nio-8090-exec-3] DEBUG ReplyingKafkaTemplate[debug:313] - Sending: ProducerRecord(topic=tp-sale.request, partition=null, headers=RecordHe_replyTopic, value = [116, 112, 45, 115, 97, 108, 101, 46, 114, 101, 112, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-117, -39, -101, -82, 87, 108, 121])], isReadOnly = false), key=null, value=org.engine.plugin.transactions.factory.SaleRequestFactory@3266d963, timestamp=null) with correlationI775]
21:38:23.512 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Received: 0 records
21:38:23.513 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
21:38:23.874 [ThreadPoolTaskScheduler-1] WARN ReplyingKafkaTemplate[warn:271] - Reply timed out for: ProducerRecord(topic=tp-sale.request, partition=null, ader(key = kafka_replyTopic, value = [116, 112, 45, 115, 97, 108, 101, 46, 114, 101, 112, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-117,5, 58, 55, -104, 87, 108, 121])], isReadOnly = true), key=null, value=org.engine.plugin.transactions.factory.SaleRequestFactory@3266d963, timestamp=null) wi361922472052560775]
21:38:23.875 [http-nio-8090-exec-3] DEBUG DispatcherServlet[logResult:1101] - Failed to complete request: java.util.concurrent.ExecutionException: org.sprinimeoutException: Reply timed out
21:38:23.876 [http-nio-8090-exec-3] DEBUG HstsHeaderWriter[writeHeaders:169] - Not injecting HSTS header since it did not match the requestMatcher org.sprintsHeaderWriter$SecureRequestMatcher@20269290
21:38:23.876 [http-nio-8090-exec-3] DEBUG HttpSessionSecurityContextRepository[saveContext:351] - SecurityContext is empty or contents are anonymous - conte
21:38:23.876 [http-nio-8090-exec-3] DEBUG SecurityContextPersistenceFilter[doFilter:119] - SecurityContextHolder now cleared, as request processing complete
21:38:23.877 [http-nio-8090-exec-3] ERROR [dispatcherServlet][log:175] - Servlet.service() for servlet [dispatcherServlet] in context with path [/engine] th nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out] with root
org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$3(ReplyingKafkaTemplate.java:342)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:835)
21:38:23.877 [http-nio-8090-exec-3] DEBUG FilterChainProxy[doFilter:328] - /error at position 1 of 9 in additional filter chain; firing Filter: 'WebAsyncMan
21:38:23.879 [http-nio-8090-exec-3] DEBUG FilterChainProxy[doFilter:328] - /error at position 6 of 9 in additional filter chain; firing Filter: 'SecurityCon
21:38:23.879 [http-nio-8090-exec-3] DEBUG FilterChainProxy[doFilter:328] - /error at position 7 of 9 in additional filter chain; firing Filter: 'AnonymousAu
21:38:23.879 [http-nio-8090-exec-3] DEBUG AnonymousAuthenticationFilter[doFilter:100] - Populated SecurityContextHolder with anonymous token: 'org.springfrauthenticationToken@3e887b61: Principal: anonymousUser; Credentials: [PROTECTED]; Authenticated: true; Details: org.springframework.security.web.authenticatidress: 95.42.192.214; SessionId: null; Granted Authorities: ROLE_ANONYMOUS' 21:38:27.131 [consumer-0-C-1] INFO AbstractCoordinator[maybeLeaveGroup:979] - [Consumer clientId=consumer-tp-sale.reply-1, groupId=tp-sale.reply] Member co-9bc4-21d08778ca0d sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
21:38:27.135 [consumer-0-C-1] INFO KafkaConsumer[unsubscribe:1082] - [Consumer clientId=consumer-tp-sale.reply-1, groupId=tp-sale.reply] Unsubscribed all t
21:38:27.135 [consumer-0-C-1] INFO ThreadPoolTaskScheduler[shutdown:218] - Shutting down ExecutorService
21:38:27.150 [consumer-0-C-1] INFO KafkaMessageListenerContainer$ListenerConsumer[info:292] - tp-sale.reply: Consumer stopped
21:38:27.151 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer[debug:313] - KafkaMessageListenerContainer [id=consumer-0, clientIndex=-0, topicPartitions
21:38:27.152 [SpringContextShutdownHook] DEBUG DefaultLifecycleProcessor[lambda$doStop$2:242] - Bean 'replyKafkaTemplate' completed its stop procedure
21:38:27.156 [SpringContextShutdownHook] INFO ThreadPoolTaskExecutor[shutdown:218] - Shutting down ExecutorService 'applicationTaskExecutor'
21:38:27.157 [SpringContextShutdownHook] INFO ThreadPoolTaskScheduler[shutdown:218] - Shutting down ExecutorService
Consumer exception:
21:38:16.005 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Received: 0 records
21:38:16.005 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
21:38:18.879 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Received: 1 records
21:38:18.880 [so65866763-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:191] - Processing [GenericMessage [payload=org.engine.plugin.transactions.ers={kafka_offset=280, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5fefd46b, kafka_correlationId=[B@52c3faa6, kafka_timestampType=CREATE_receivedPartitionId=0, kafka_receivedTopic=tp-sale.request, kafka_receivedTimestamp=1611956298872, kafka_groupId=so65866763}]]
In SaleRequestFactoryListener: org.engine.plugin.transactions.factory.SaleRequestFactory@79e222f9
21:38:18.880 [so65866763-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:313] - Listener method returned result [InvocationResult [result=org.engineFactory@2f8d0340, sendTo=tp-sale.reply, messageReturnType=false]] - generating response message for it
21:38:18.880 [so65866763-0-C-1] DEBUG AbstractKafkaHeaderMapper$SimplePatternBasedHeaderMatcher[debug:313] - headerName=[kafka_correlationId] WILL be mapped
21:38:18.880 [so65866763-0-C-1] DEBUG AbstractKafkaHeaderMapper$SimplePatternBasedHeaderMatcher[debug:313] - headerName=[id] WILL NOT be mapped, matched pat
21:38:18.880 [so65866763-0-C-1] DEBUG AbstractKafkaHeaderMapper$SimplePatternBasedHeaderMatcher[debug:313] - headerName=[timestamp] WILL NOT be mapped, matc
21:38:18.881 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
21:38:18.881 [so65866763-0-C-1] ERROR SeekToCurrentErrorHandler[error:149] - Backoff none exhausted for ConsumerRecord(topic = tp-sale.request, partition = ime = 1611956298872, serialized key size = -1, serialized value size = 87, headers = RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [ 101, 112, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-117, -39, -101, -82, -42, 29, 65, 108, -97, 65, 58, 55, -104, 87, 108, 121])], isReengine.plugin.transactions.factory.SaleRequestFactory@79e222f9)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.apache.kafka.common.errors.SerializationExceptine.plugin.transactions.factory.SaleResponseFactory to class org.engine.plugin.transactions.factory.ResponseFactorySerializer specified in value.serializer
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2040)
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.engine.plugin.transactions.factory.SaleResponseFactory totory.ResponseFactorySerializer specified in value.serializer
Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.sprinader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactorySerializer.serialize(ResponseFactorySerializer.java:10)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:781)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:401)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendReplyForMessageSource(MessagingMessageListenerAdapter.java:517)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendSingleResult(MessagingMessageListenerAdapter.java:484)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendResponse(MessagingMessageListenerAdapter.java:445)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.handleResult(MessagingMessageListenerAdapter.java:370)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:88)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1992)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1974)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1911)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1851)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1748)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1472)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1135)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
21:38:18.882 [so65866763-0-C-1] DEBUG SeekToCurrentErrorHandler[debug:313] - Skipping seek of: ConsumerRecord(topic = tp-sale.request, partition = 0, leader1956298872, serialized key size = -1, serialized value size = 87, headers = RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [116, 112,, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-117, -39, -101, -82, -42, 29, 65, 108, -97, 65, 58, 55, -104, 87, 108, 121])], isReadOnly = ugin.transactions.factory.SaleRequestFactory@79e222f9)
21:38:18.882 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {tp-sale.request-0=OffsetAndMetadata{offset=2
21:38:18.883 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Committing: {tp-sale.request-0=OffsetAndMetadata{offset=28
^C21:38:23.083 [SpringContextShutdownHook] DEBUG AnnotationConfigApplicationContext[doClose:1006] - Closing org.springframework.context.annotation.Annotatiod on Fri Jan 29 21:36:41 UTC 2021, parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@67e2d983
21:38:23.087 [SpringContextShutdownHook] DEBUG DefaultLifecycleProcessor[stop:369] - Stopping beans in phase 2147483547
21:38:23.095 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
21:38:23.095 [so65866763-0-C-1] INFO ConsumerCoordinator[invokePartitionsRevoked:292] - [Consumer clientId=consumer-so65866763-1, groupId=so65866763] Revokrequest-0
21:38:23.096 [so65866763-0-C-1] INFO KafkaMessageListenerContainer[info:292] - so65866763: partitions revoked: [tp-sale.request-0]
21:38:23.096 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
21:38:23.097 [so65866763-0-C-1] INFO AbstractCoordinator[maybeLeaveGroup:979] - [Consumer clientId=consumer-so65866763-1, groupId=so65866763] Member consumfacf6942795 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
21:38:23.098 [so65866763-0-C-1] INFO KafkaConsumer[unsubscribe:1082] - [Consumer clientId=consumer-so65866763-1, groupId=so65866763] Unsubscribed all topic
21:38:23.098 [so65866763-0-C-1] INFO ThreadPoolTaskScheduler[shutdown:218] - Shutting down ExecutorService
21:38:23.111 [so65866763-0-C-1] INFO KafkaMessageListenerContainer$ListenerConsumer[info:292] - so65866763: Consumer stopped
21:38:23.115 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer[debug:313] - KafkaMessageListenerContainer [id=so65866763-0, clientIndex=-0, topicPartit
21:38:23.116 [so65866763-0-C-1] DEBUG DefaultLifecycleProcessor[lambda$doStop$2:242] - Bean 'org.springframework.kafka.config.internalKafkaListenerEndpointR
21:38:23.119 [SpringContextShutdownHook] INFO KafkaProducer[close:1182] - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30
Do you know where I'm wrong? I can't find my mistake.