0

I am using Spring Boot 2.0.2.RELEASE with Spring for Apache Kafka (effective pom shows 2.1.6.RELEASE version for spring-kafka).

I have gone from using regular ByteArrayDeserializer to using Confluent's deserializer

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

As a result, I don't have to get the bytes and then deserialize them into the payload etc. But the side-affect of that is some of the old messages - i cannot read anymore because their schema is slightly different in the confluent registry.

So when I start the app I keep getting this message

    2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

So i decided that I have to start listening from the end of the topic, i have checked the documentation https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek which suggested implementing ConsumerSeekAware and its sub-interface ConsumerSeekAware.ConsumerSeekCallback

i have changed the @service class containing the @KafkaListener method to implement the interface mentioned in the documentation

@Service
 public class MyAvroListener implements 
 ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware {

and it has the @kafkalistener annotated method where i have tried seekingToEnd of the partition

@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
    public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
    this.seekCallBack.get().seekToEnd(topic,0);
    try {
        for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {

I have also tried seeking specific offset (because i keep getting stuck at 36833 offset message)

@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
        public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
        this.seekCallBack.get().seek(topic,0,36900);
        try {
            for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {

I have implemented the methods from the above interfaces

private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
    this.seekCallBack.set(consumerSeekCallback);
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

}

@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

}

@Override
public void seek(String s, int i, long l) {

}

@Override
public void seekToBeginning(String s, int i) {

}

@Override
public void seekToEnd(String topic, int partition) {
    System.out.println("seekToEnd is hit for topic s = " + topic + " and partition i=" + partition);
}

When the app comes up, the registerSeekCallBack method does get hit, but the seekToEnd or seek method is not getting hit.

and hence I keep getting this message

    2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

I am using the snippet from here Spring Kafka Template implementaion example for seek offset, acknowledgement

As mentioned here, What determines Kafka consumer offset?, i cannot use auto.offset.reset property to start consuming from the end of the topic (unless I use a different consumerGroupId - which in my case is not possible). I am wondering if I can solve this problem using the existing consumer-group.

Robin Bajaj
  • 2,002
  • 4
  • 29
  • 47

3 Answers3

1

You have an topic when you have already sent message with different schema.

this problem can be solved in multiple ways.

### Deletes all schema versions registered under the subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
  [1]

### Deletes version 1 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/1
  1

### Deletes the most recently registered schema under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/latest

The above API's are primarily intended to be used be in development environment where it's common to go through iterations before finalizing a schema. While it's not recommended to be used in a production environment, there are few scenarios where these API's can be used in production but with utmost care.

  • A new schema to be registered has compatibility issues with one of the existing schema versions

  • An old version of the schema needs to be registered again for the same subject

  • The schema's are used only in real-time streaming systems and the older version(s) are absolutely no longer required

  • A topic needs to be recycled

It is also important to note that any registered compatibility settings for the subject would also be deleted while using Delete Subject or when you delete the only available schema version.

Second approach is to start sending messages to new topic. Follow these steps and you would be fine.

  • Update Producer to send data to new topic which will register updated schema in schema registry
  • Verify that Lag is zero for all consumers of this topic
  • Update consumer to consume data from new topic
  • Delete old topic from kafka
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
0

You are performing the seek too late - after the poll() that fetches the records; you need to perform the seek(s) in

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

}

by calling consumerSeekCallback.seekToEnd(...) there. The seek will happen before the poll() to fetch the record(s).

You can also use the kafka-consumer-groups command-line tool to set arbitrary offsets for a group/topic/partition.

The current boot release is 2.0.4, kafka 2.1.8.

Also, you shouldn’t implement the callback, that is passed to you.

The documentation seems clear...

When using group management, the second method is called when assignments change. You can use this method, for example, for setting initial offsets for the partitions, by calling the callback; you must use the callback argument, not the one passed into registerSeekCallback.

...if not, what should we change?

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
0

Based on @Gary Russell's feedback i have done the following change to my code to get it work. Thanks @Gary and @Manjeet/@cricket_007

So i did the following and it worked

Basically no change to the method annotated as @KafkaListener , but the class containing it has to implement these interfaces

MyKafkaListenerClass implements ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware

and then in that class i implement the methods from these interfaces...

    private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
    this.seekCallBack.set(consumerSeekCallback);
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
    this.seekCallBack.get().seekToEnd(topic,0);
}

@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

}

@Override
public void seek(String topic, int partition, long offset) {
    System.out.println("seekToEnd is hit for topic= " + topic + " and partition=" + partition+ " and offset =" + offset);
}

@Override
public void seekToBeginning(String s, int i) {

}

@Override
public void seekToEnd(String topic, int partition) {
    System.out.println("seekToEnd is hit for topic s = " + topic + " and partition i=" + partition);
}
Robin Bajaj
  • 2,002
  • 4
  • 29
  • 47