I have a set of applications that read from kafka in different consumer groups and now I want to join them in the same consumer group. To do this and avoid having to consume a lot of old data I wanted to define the offsets of the consumer groups.
I found some ways of doing this for example here and here
How to create a new consumer group in kafka
I managed to create my groups and define for example the latest offset or the offset for all topics. Something like this:
alias kcg=/opt/kafka/bin/kafka-consumer-groups.sh
kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-latest --all-topics --execute
kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --shift-by -10 --all-topics --execute
Now when I try to define it for example with a simple date it will complain, as expected saying my date is not ISO 8601:
kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-datetime "20220330093926Z" --all-topics --execute
Error: Executing consumer group command failed due to Error parsing timestamp. It does not contain a 'T' according to ISO8601 format
java.text.ParseException: Error parsing timestamp. It does not contain a 'T' according to ISO8601 format
at org.apache.kafka.common.utils.Utils.getDateTime(Utils.java:1347)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.prepareOffsetsToReset(ConsumerGroupCommand.scala:811)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:438)
But when I try with a ISO8601 date, I still get an error that it cannot parse the date.
kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-datetime "20220330T093926Z" --all-topics --execute
Error: Executing consumer group command failed due to Unparseable date: "2022-03-30T09:39:26Z"
java.text.ParseException: Unparseable date: "2022-03-30T09:39:26Z"
at java.base/java.text.DateFormat.parse(DateFormat.java:395)
at org.apache.kafka.common.utils.Utils.getDateTime(Utils.java:1364)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.prepareOffsetsToReset(ConsumerGroupCommand.scala:811)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:438)
I tried for example and they all got the same error:
2022-03-30T00:01:25+0000
2022-03-30T09:44:25+0000
2022-03-30T09:39:26Z
20220330T093926Z
I did try to define the date in different ways as describe here but I was not very lucky: java.text.ParseException: Unparseable date
Anyone has experience on doing this? And what I might be doing wrong?