As Gary Russel pointed out above, ConsumerSeekCallback is legacy, so it's a no-go ... and I won't open a GitHub issue ...
I was finally able to achieve my objective:
When a user logs in she is to supposed to see all the events she
missed out on since her last session.
by handling all new subscriptions in an EventListener for the ListenerContainerIdleEvent, where the consumer is available as part of the event data:
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void idleEventHandler(ListenerContainerIdleEvent event) {
// find new subscriptions
Collection<EventListenerSubscription> newSubscriptions =
subscriptions.stream().filter(s -> s.isNew())
.collect(Collectors.toList());
if (!newSubscriptions.isEmpty()) {
// mark subscriptions a not new
newSubscriptions.forEach(s -> s.setNew(false));
// compute the oldest time stamp
OptionalLong oldestTimeStamp =
newSubscriptions.stream()
.mapToLong(s -> s.getLastTimeStamp())
.reduce(Long::min);
if (oldestTimeStamp.isPresent()) {
// seek on topic for oldest time stamp
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(new TopicPartition(eventTopic, 0),
oldestTimeStamp.getAsLong());
Consumer<?, ?> consumer = event.getConsumer();
event.getConsumer().offsetsForTimes(timestampsToSearch).forEach((k, v) -> {
consumer.seek(k, v.offset());
});
}
}
}
I determine the oldest time stamp across all new subscriptions, mark these subscriptions as not new, and use the consumer seek back on the topic for the oldest time stamp.
In order to get the container idle event, the idle interval has to be configured in the container properties, as described here.
The KafkaListener will then take care of sending the old events to the (formerly new) subscribers:
@KafkaListener(id = "qux", topics = { "${app.event.topic}" }, errorHandler = "kafkaListenerErrorHandler")
public void receive(@Payload Event event, @Headers MessageHeaders headers) throws JsonProcessingException {
// collect the subscribers not marked as new
Collection<EventListenerSubscription> oldSubscriptions =
subscriptions.stream().filter(s -> !s.isNew())
.collect(Collectors.toList());
for (EventListenerSubscription s : oldSubscriptions) {
if (s.getLastTimeStamp() < timestamp) {
s.addMessage(event, timestamp);
}
}
}