1

I have a set of applications that read from kafka in different consumer groups and now I want to join them in the same consumer group. To do this and avoid having to consume a lot of old data I wanted to define the offsets of the consumer groups.

I found some ways of doing this for example here and here

How to create a new consumer group in kafka

I managed to create my groups and define for example the latest offset or the offset for all topics. Something like this:

alias kcg=/opt/kafka/bin/kafka-consumer-groups.sh

kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-latest --all-topics --execute

kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --shift-by -10 --all-topics --execute

Now when I try to define it for example with a simple date it will complain, as expected saying my date is not ISO 8601:

kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-datetime "20220330093926Z" --all-topics --execute

Error: Executing consumer group command failed due to Error parsing timestamp. It does not contain a 'T' according to ISO8601 format
java.text.ParseException: Error parsing timestamp. It does not contain a 'T' according to ISO8601 format
        at org.apache.kafka.common.utils.Utils.getDateTime(Utils.java:1347)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.prepareOffsetsToReset(ConsumerGroupCommand.scala:811)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:438)

But when I try with a ISO8601 date, I still get an error that it cannot parse the date.

kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-datetime "20220330T093926Z" --all-topics --execute
Error: Executing consumer group command failed due to Unparseable date: "2022-03-30T09:39:26Z"
java.text.ParseException: Unparseable date: "2022-03-30T09:39:26Z"
        at java.base/java.text.DateFormat.parse(DateFormat.java:395)
        at org.apache.kafka.common.utils.Utils.getDateTime(Utils.java:1364)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.prepareOffsetsToReset(ConsumerGroupCommand.scala:811)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:438)

I tried for example and they all got the same error:

2022-03-30T00:01:25+0000
2022-03-30T09:44:25+0000    
2022-03-30T09:39:26Z    
20220330T093926Z

I did try to define the date in different ways as describe here but I was not very lucky: java.text.ParseException: Unparseable date

Anyone has experience on doing this? And what I might be doing wrong?

Miguel Costa
  • 627
  • 1
  • 12
  • 30
  • 1
    Read the Aiven link you've put again. There is no Z or timezone offset that needs provided. All times in Kafka are treated as UTC. The end of the format should be `SS.sss` to account for milliseconds – OneCricketeer Mar 30 '22 at 13:20
  • 2
    I agree, date format should be 'YYYY-MM-DDTHH:mm:SS.sss' – Niranjan Mar 30 '22 at 13:30
  • I think I tried that also but I will check and confirm thank you – Miguel Costa Mar 30 '22 at 15:30
  • @Niranjan, thank you for the feedback, really basic mistake, if you want to provide an answer this is obviously the correct response. I was also lead a bit by mistake on the reading of the confluence on ISO that had examples with the Z for UTC and then also https://www.utctime.net/ so I mixed both which was obviously wrong 20220330T175526Z – Miguel Costa Mar 30 '22 at 19:53

2 Answers2

1

thanks for the feedback in the comments, I made a really basic mistake and I thought I had tested without the Z but somehow I did not. Something like "2022-03-30T01:01:01.001" works

Miguel Costa
  • 627
  • 1
  • 12
  • 30
0

add the decimal seconds: 2022-03-30T09:44:25.000