I am using Spring Boot 2.0.2.RELEASE
with Spring for Apache Kafka (effective pom shows 2.1.6.RELEASE
version for spring-kafka).
I have gone from using regular ByteArrayDeserializer
to using Confluent's deserializer
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
As a result, I don't have to get the bytes and then deserialize them into the payload etc. But the side-affect of that is some of the old messages - i cannot read anymore because their schema is slightly different in the confluent registry.
So when I start the app I keep getting this message
2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler : Error while processing:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
So i decided that I have to start listening from the end of the topic, i have checked the documentation https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek which suggested implementing ConsumerSeekAware and its sub-interface ConsumerSeekAware.ConsumerSeekCallback
i have changed the @service class containing the @KafkaListener method to implement the interface mentioned in the documentation
@Service
public class MyAvroListener implements
ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware {
and it has the @kafkalistener annotated method where i have tried seekingToEnd of the partition
@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
this.seekCallBack.get().seekToEnd(topic,0);
try {
for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {
I have also tried seeking specific offset (because i keep getting stuck at 36833 offset message)
@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
this.seekCallBack.get().seek(topic,0,36900);
try {
for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {
I have implemented the methods from the above interfaces
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
this.seekCallBack.set(consumerSeekCallback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
}
@Override
public void seek(String s, int i, long l) {
}
@Override
public void seekToBeginning(String s, int i) {
}
@Override
public void seekToEnd(String topic, int partition) {
System.out.println("seekToEnd is hit for topic s = " + topic + " and partition i=" + partition);
}
When the app comes up, the registerSeekCallBack method does get hit, but the seekToEnd or seek method is not getting hit.
and hence I keep getting this message
2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler : Error while processing:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
I am using the snippet from here Spring Kafka Template implementaion example for seek offset, acknowledgement
As mentioned here, What determines Kafka consumer offset?, i cannot use auto.offset.reset property to start consuming from the end of the topic (unless I use a different consumerGroupId - which in my case is not possible). I am wondering if I can solve this problem using the existing consumer-group.