0

I have a Kafka topic and a consumer with a consumer group assigned (a must) in a Spring Cloud application. As a requirement, in every application restart I need to start reading all received messages from the beginning. This was supposed to be achieved by the resetOffsets property, but it is clear from this issue that it currently doesn't work.

I found this workaround used in kafka consumer api, that suggest assigning a new random name to the consumer group in every restart as a way to start reading from the earliest. Is it possible/recommended in Spring Cloud Stream? How could I define a dynamic name to a consumer group?

jesantana
  • 1,241
  • 2
  • 14
  • 27

3 Answers3

1

Yes, it would work with SCSt too but, as you say, it's a bit tricky to set a random group id, although you could set it as a System.property before launching the SpringApplication.

If you were using spring-kafka directly, it's easy, just implement ConsumerSeekAware and you can seekToBeginning when the partitions are assigned.

However, with SCSt, you don't have direct access to the listener.

One workaround would be to manually do the seek before you launch the SpringApplication by creating a consumer with the same group id. It gets a bit tricky, though if you have multiple instances of your app because your might get different partitions each time.

We'll look again at fixing that issue (I just made a comment on it).

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
1

You have a few options if you require the application to restart from the beginning everytime:

  1. You can reset the committed offset to earliest before restarting the application using the kafka-consumer-groups.sh tool (kafka.admin.ConsumerGroupCommand.scala)

  2. Upon restarting, the application can seek to the beginning and manually commit offset 0. If you have auto.offset.reset set to earliest even if 0 is not a valid offset, it will restart from the beginning.

  3. You can use a different consumer group.id value every time. In your Consumer Configuration bean, insert in the Properties object something like:

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    

Finally, do you use the committee offset at all ? If not, just disable enable.auto.commit then the application will always follow the auto.offset.reset setting.

Options 1 and 2 are usually preferred as they keep a consistent group.id allowing to easily add consumer instances to the group and monitor th group.

Mickael Maison
  • 25,067
  • 7
  • 71
  • 68
0

in spring-kafka, if you're configuring consumer vía configuration files such as application.yaml (instead of programmatically) you can get this with SpEL (spring expressing language) to provide an UUID-based consumer group on each execution + earliest offset

# consume-all-configuration
auto-offset-reset: earliest
group: consumer-local-#{ T(java.util.UUID).randomUUID().toString() }
wideawakening
  • 51
  • 1
  • 7