0

I want to implement Kafka Topic which sends and receives Serialized Java Objects based on this example.

I tried this:

Producer Config:

@Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, Object> requestFactoryProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RequestFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> requestFactoryKafkaTemplate() {
        return new KafkaTemplate<>(requestFactoryProducerFactory());
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.reply");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ResponseFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate(ProducerFactory<String, Object> producerFactory, ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
        ConcurrentMessageListenerContainer<String, Object> kafkaMessageListenerContainer = factory.createContainer("tp-sale.reply");
        ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
        requestReplyKafkaTemplate.setDefaultTopic("tp-sale.reply");
        return requestReplyKafkaTemplate;
    }
}

Producer:

private KafkaTemplate<String, Object> requestFactoryKafkaTemplate;
    private ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate;

    @Autowired
    public CheckoutController(KafkaTemplate<String, Object> requestFactoryKafkaTemplate,
                              ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate){
        this.requestFactoryKafkaTemplate = requestFactoryKafkaTemplate;
        this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
    }

    @PostMapping("sale_test")
    public void performSaleTest() throws ExecutionException, InterruptedException, TimeoutException {

        SaleRequestFactory obj = new SaleRequestFactory();
        obj.setId(100);

        ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
        RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
        SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);


        SaleResponseFactory value = (SaleResponseFactory) consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());


    }

    @PostMapping("authorize_test")
    public void performAuthTest() throws ExecutionException, InterruptedException, TimeoutException {

        AuthRequestFactory obj = new AuthRequestFactory();
        obj.setId(140);

        ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
        RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
        SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);


        AuthResponseFactory value = (AuthResponseFactory) consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());


    }

RequestFactoryDeserializer

public class RequestFactoryDeserializer implements Serializable, Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] data)
    {
        ByteArrayInputStream bis = new ByteArrayInputStream(data);
        try
        {
            ObjectInputStream in = new ObjectInputStream(bis);
            in.close();
            return in.readObject();
        }
        catch (IOException | ClassNotFoundException e)
        {
            throw new RuntimeException("Unhandled", e);
        }
    }
}

RequestFactorySerializer

public class RequestFactorySerializer implements Serializable, Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data)
    {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try
        {
            ObjectOutputStream outputStream = new ObjectOutputStream(out);
            outputStream.writeObject(data);
            out.close();
            return out.toByteArray();
        }
        catch (IOException e)
        {
            throw new RuntimeException("Unhandled", e);
        }
    }
}

ResponseFactoryDeserializer

public class ResponseFactoryDeserializer implements Serializable, Deserializer<AuthResponseFactory> {
    @Override
    public AuthResponseFactory deserialize(String topic, byte[] data)
    {
        AuthResponseFactory authResponseFactory = null;
        try
        {
            ByteArrayInputStream bis = new ByteArrayInputStream(data);
            ObjectInputStream in = new ObjectInputStream(bis);
            authResponseFactory = (AuthResponseFactory) in.readObject();
            in.close();
        }
        catch (IOException | ClassNotFoundException e)
        {
            throw new RuntimeException("Unhandled", e);
        }
        return authResponseFactory;
    }
}

ResponseFactorySerializer

public class ResponseFactorySerializer implements Serializable, Serializer<AuthResponseFactory> {

    @Override
    public byte[] serialize(String topic, AuthResponseFactory data)
    {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try
        {
            ObjectOutputStream outputStream = new ObjectOutputStream(out);
            outputStream.writeObject(data);
            out.close();
        }
        catch (IOException e)
        {
            throw new RuntimeException("Unhandled", e);
        }
        return out.toByteArray();
    }
}

Consumer configuration:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.request");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, RequestFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ProducerFactory<String, Object> saleResponseFactoryProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ResponseFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(saleResponseFactoryKafkaTemplate());
        return factory;
    }

    @Bean
    public KafkaTemplate<String, Object> saleResponseFactoryKafkaTemplate() {
        return new KafkaTemplate<>(saleResponseFactoryProducerFactory());
    }

}

Consumer

@Component
@KafkaListener(id = "so65866763", topics = "tp-sale.request")
public class ConsumerListener {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListener.class);

    @KafkaHandler
    @SendTo("tp-sale.reply")
    public AuthResponseFactory fooListener(AuthRequestFactory authRequestFactory) {
        System.out.println("In AuthRequestFactoryListener: " + authRequestFactory);

        AuthResponseFactory resObj = new AuthResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }

    @KafkaHandler
    @SendTo("tp-sale.reply")
    public SaleResponseFactory barListener(SaleRequestFactory saleRequestFactory) {
        System.out.println("In SaleRequestFactoryListener: " + saleRequestFactory);

        SaleResponseFactory resObj = new SaleResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }
}

Full minimal working example

When I run the code I get this exception:

Consumer exception:

21:38:18.872 [http-nio-8090-exec-3] DEBUG ReplyingKafkaTemplate[debug:313] - Sending: ProducerRecord(topic=tp-sale.request, partition=null, headers=RecordHe_replyTopic, value = [116, 112, 45, 115, 97, 108, 101, 46, 114, 101, 112, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-117, -39, -101, -82, 87, 108, 121])], isReadOnly = false), key=null, value=org.engine.plugin.transactions.factory.SaleRequestFactory@3266d963, timestamp=null) with correlationI775]
21:38:23.512 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Received: 0 records
21:38:23.513 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
21:38:23.874 [ThreadPoolTaskScheduler-1] WARN  ReplyingKafkaTemplate[warn:271] - Reply timed out for: ProducerRecord(topic=tp-sale.request, partition=null, ader(key = kafka_replyTopic, value = [116, 112, 45, 115, 97, 108, 101, 46, 114, 101, 112, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-117,5, 58, 55, -104, 87, 108, 121])], isReadOnly = true), key=null, value=org.engine.plugin.transactions.factory.SaleRequestFactory@3266d963, timestamp=null) wi361922472052560775]
21:38:23.875 [http-nio-8090-exec-3] DEBUG DispatcherServlet[logResult:1101] - Failed to complete request: java.util.concurrent.ExecutionException: org.sprinimeoutException: Reply timed out
21:38:23.876 [http-nio-8090-exec-3] DEBUG HstsHeaderWriter[writeHeaders:169] - Not injecting HSTS header since it did not match the requestMatcher org.sprintsHeaderWriter$SecureRequestMatcher@20269290
21:38:23.876 [http-nio-8090-exec-3] DEBUG HttpSessionSecurityContextRepository[saveContext:351] - SecurityContext is empty or contents are anonymous - conte
21:38:23.876 [http-nio-8090-exec-3] DEBUG SecurityContextPersistenceFilter[doFilter:119] - SecurityContextHolder now cleared, as request processing complete
21:38:23.877 [http-nio-8090-exec-3] ERROR [dispatcherServlet][log:175] - Servlet.service() for servlet [dispatcherServlet] in context with path [/engine] th nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out] with root
org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
        at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$3(ReplyingKafkaTemplate.java:342)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:835)
21:38:23.877 [http-nio-8090-exec-3] DEBUG FilterChainProxy[doFilter:328] - /error at position 1 of 9 in additional filter chain; firing Filter: 'WebAsyncMan
21:38:23.879 [http-nio-8090-exec-3] DEBUG FilterChainProxy[doFilter:328] - /error at position 6 of 9 in additional filter chain; firing Filter: 'SecurityCon
21:38:23.879 [http-nio-8090-exec-3] DEBUG FilterChainProxy[doFilter:328] - /error at position 7 of 9 in additional filter chain; firing Filter: 'AnonymousAu
21:38:23.879 [http-nio-8090-exec-3] DEBUG AnonymousAuthenticationFilter[doFilter:100] - Populated SecurityContextHolder with anonymous token: 'org.springfrauthenticationToken@3e887b61: Principal: anonymousUser; Credentials: [PROTECTED]; Authenticated: true; Details: org.springframework.security.web.authenticatidress: 95.42.192.214; SessionId: null; Granted Authorities: ROLE_ANONYMOUS'    21:38:27.131 [consumer-0-C-1] INFO  AbstractCoordinator[maybeLeaveGroup:979] - [Consumer clientId=consumer-tp-sale.reply-1, groupId=tp-sale.reply] Member co-9bc4-21d08778ca0d sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
21:38:27.135 [consumer-0-C-1] INFO  KafkaConsumer[unsubscribe:1082] - [Consumer clientId=consumer-tp-sale.reply-1, groupId=tp-sale.reply] Unsubscribed all t
21:38:27.135 [consumer-0-C-1] INFO  ThreadPoolTaskScheduler[shutdown:218] - Shutting down ExecutorService
21:38:27.150 [consumer-0-C-1] INFO  KafkaMessageListenerContainer$ListenerConsumer[info:292] - tp-sale.reply: Consumer stopped
21:38:27.151 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer[debug:313] - KafkaMessageListenerContainer [id=consumer-0, clientIndex=-0, topicPartitions
21:38:27.152 [SpringContextShutdownHook] DEBUG DefaultLifecycleProcessor[lambda$doStop$2:242] - Bean 'replyKafkaTemplate' completed its stop procedure
21:38:27.156 [SpringContextShutdownHook] INFO  ThreadPoolTaskExecutor[shutdown:218] - Shutting down ExecutorService 'applicationTaskExecutor'
21:38:27.157 [SpringContextShutdownHook] INFO  ThreadPoolTaskScheduler[shutdown:218] - Shutting down ExecutorService

Consumer exception:

    21:38:16.005 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Received: 0 records
    21:38:16.005 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
    21:38:18.879 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Received: 1 records
    21:38:18.880 [so65866763-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:191] - Processing [GenericMessage [payload=org.engine.plugin.transactions.ers={kafka_offset=280, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5fefd46b, kafka_correlationId=[B@52c3faa6, kafka_timestampType=CREATE_receivedPartitionId=0, kafka_receivedTopic=tp-sale.request, kafka_receivedTimestamp=1611956298872, kafka_groupId=so65866763}]]
    In SaleRequestFactoryListener: org.engine.plugin.transactions.factory.SaleRequestFactory@79e222f9
    21:38:18.880 [so65866763-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:313] - Listener method returned result [InvocationResult [result=org.engineFactory@2f8d0340, sendTo=tp-sale.reply, messageReturnType=false]] - generating response message for it
    21:38:18.880 [so65866763-0-C-1] DEBUG AbstractKafkaHeaderMapper$SimplePatternBasedHeaderMatcher[debug:313] - headerName=[kafka_correlationId] WILL be mapped
    21:38:18.880 [so65866763-0-C-1] DEBUG AbstractKafkaHeaderMapper$SimplePatternBasedHeaderMatcher[debug:313] - headerName=[id] WILL NOT be mapped, matched pat
    21:38:18.880 [so65866763-0-C-1] DEBUG AbstractKafkaHeaderMapper$SimplePatternBasedHeaderMatcher[debug:313] - headerName=[timestamp] WILL NOT be mapped, matc
    21:38:18.881 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
    21:38:18.881 [so65866763-0-C-1] ERROR SeekToCurrentErrorHandler[error:149] - Backoff none exhausted for ConsumerRecord(topic = tp-sale.request, partition = ime = 1611956298872, serialized key size = -1, serialized value size = 87, headers = RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [ 101, 112, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-117, -39, -101, -82, -42, 29, 65, 108, -97, 65, 58, 55, -104, 87, 108, 121])], isReengine.plugin.transactions.factory.SaleRequestFactory@79e222f9)
    org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.apache.kafka.common.errors.SerializationExceptine.plugin.transactions.factory.SaleResponseFactory to class org.engine.plugin.transactions.factory.ResponseFactorySerializer specified in value.serializer
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2040)
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.lang.Thread.run(Thread.java:835)
    Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.engine.plugin.transactions.factory.SaleResponseFactory totory.ResponseFactorySerializer specified in value.serializer
    Caused by: java.lang.ClassCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.sprinader @6267c3bb)
            at org.engine.plugin.transactions.factory.ResponseFactorySerializer.serialize(ResponseFactorySerializer.java:10)
            at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
            at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
            at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:781)
            at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)
            at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:401)
            at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendReplyForMessageSource(MessagingMessageListenerAdapter.java:517)
            at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendSingleResult(MessagingMessageListenerAdapter.java:484)
            at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendResponse(MessagingMessageListenerAdapter.java:445)
            at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.handleResult(MessagingMessageListenerAdapter.java:370)
            at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:88)
            at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1992)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1974)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1911)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1851)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1748)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1472)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1135)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.lang.Thread.run(Thread.java:835)
    21:38:18.882 [so65866763-0-C-1] DEBUG SeekToCurrentErrorHandler[debug:313] - Skipping seek of: ConsumerRecord(topic = tp-sale.request, partition = 0, leader1956298872, serialized key size = -1, serialized value size = 87, headers = RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [116, 112,, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-117, -39, -101, -82, -42, 29, 65, 108, -97, 65, 58, 55, -104, 87, 108, 121])], isReadOnly = ugin.transactions.factory.SaleRequestFactory@79e222f9)
    21:38:18.882 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {tp-sale.request-0=OffsetAndMetadata{offset=2
    21:38:18.883 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Committing: {tp-sale.request-0=OffsetAndMetadata{offset=28
    ^C21:38:23.083 [SpringContextShutdownHook] DEBUG AnnotationConfigApplicationContext[doClose:1006] - Closing org.springframework.context.annotation.Annotatiod on Fri Jan 29 21:36:41 UTC 2021, parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@67e2d983
    21:38:23.087 [SpringContextShutdownHook] DEBUG DefaultLifecycleProcessor[stop:369] - Stopping beans in phase 2147483547
    21:38:23.095 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
    21:38:23.095 [so65866763-0-C-1] INFO  ConsumerCoordinator[invokePartitionsRevoked:292] - [Consumer clientId=consumer-so65866763-1, groupId=so65866763] Revokrequest-0
    21:38:23.096 [so65866763-0-C-1] INFO  KafkaMessageListenerContainer[info:292] - so65866763: partitions revoked: [tp-sale.request-0]
    21:38:23.096 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
    21:38:23.097 [so65866763-0-C-1] INFO  AbstractCoordinator[maybeLeaveGroup:979] - [Consumer clientId=consumer-so65866763-1, groupId=so65866763] Member consumfacf6942795 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
    21:38:23.098 [so65866763-0-C-1] INFO  KafkaConsumer[unsubscribe:1082] - [Consumer clientId=consumer-so65866763-1, groupId=so65866763] Unsubscribed all topic
    21:38:23.098 [so65866763-0-C-1] INFO  ThreadPoolTaskScheduler[shutdown:218] - Shutting down ExecutorService
    21:38:23.111 [so65866763-0-C-1] INFO  KafkaMessageListenerContainer$ListenerConsumer[info:292] - so65866763: Consumer stopped
    21:38:23.115 [so65866763-0-C-1] DEBUG KafkaMessageListenerContainer[debug:313] - KafkaMessageListenerContainer [id=so65866763-0, clientIndex=-0, topicPartit
    21:38:23.116 [so65866763-0-C-1] DEBUG DefaultLifecycleProcessor[lambda$doStop$2:242] - Bean 'org.springframework.kafka.config.internalKafkaListenerEndpointR
    21:38:23.119 [SpringContextShutdownHook] INFO  KafkaProducer[close:1182] - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30

Do you know where I'm wrong? I can't find my mistake.

Peter Penzov
  • 1,126
  • 134
  • 430
  • 808
  • I still don't understand why you're struggling with this instead of just using spring-provided JSON serializers, but in any case, for this code, you don't need two different serializers since both go from Object->byte[] – OneCricketeer Jan 30 '21 at 04:39
  • Your mentioned "producer config" mentions consumers and deserializers... Your "consumer config" has a producer factory. This is very confusing, but also indicates why something is misconfigured because the error is that you're casting the wrong class here `authResponseFactory = (AuthResponseFactory) in.readObject();`, and I don't think the linked example used casts, nor closed the stream before reading the object – OneCricketeer Jan 30 '21 at 04:53
  • I want to to use Serialized object for performance reasons. Can you give me a hand please? – Peter Penzov Jan 30 '21 at 09:50
  • To my knowledge, this is not more performant. Review the answer you got previously - there's only one serializer class for both Foo and Bar producers (your stacktrace here is in the producer+serializer, not the consumer) – OneCricketeer Jan 30 '21 at 13:38
  • I tried this attempt https://stackoverflow.com/questions/65969337/return-java-object-to-proper-kafka-producer but unfortunately I get error – Peter Penzov Jan 30 '21 at 14:54
  • 1
    As @OneCricketeer said, you should not be referencing concrete types in the serializer/deserializer; only `Object` as in my example on the other question. – Gary Russell Feb 01 '21 at 16:00
  • I saw where is my mistake. Thanks! – Peter Penzov Feb 01 '21 at 16:04

0 Answers0