0

We are using spring-kafka to consume messages that have to be forwarded to the frontend as server sent events (SSE).

When a user logs in she is to supposed to see all the events she missed out on since her last session.

Current implementation uses the ConsumerSeekCallback as described in this answer

However that callback does not support the offsetForTimes method of the underlying KafkaConsumer (KafkaConsumer#offsetForTimes).

So I have to use seekToBeginning and filter for the time stamp, which will cause problems when there are a lot of messages ...

Is there any other way to receive only the messages since a given time stamp? Maybe a safe way to use the consumer directly?

Ole V.V.
  • 81,772
  • 15
  • 137
  • 161
Gregor Ophey
  • 817
  • 6
  • 12

2 Answers2

1

2.0 introduced the ConsumerAwareRebalanceListener (the current version is 2.2.2).

See How to test a ConsumerAwareRebalanceListener? for an example.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • not really what I was looking for, but thanks anyway for pointing out that the consumer is available in some parts of the API. I am now using a [ConsumerAwareListenerErrorHandler](https://docs.spring.io/spring-kafka/docs/2.2.2.RELEASE/api/org/springframework/kafka/listener/ConsumerAwareListenerErrorHandler.html) to deal with the offsetForTimes part of my problem. – Gregor Ophey Dec 07 '18 at 16:51
  • Do you think it would be feasible to add offsetForTimes to the [ConsumerSeekCallback](https://docs.spring.io/spring-kafka/docs/2.2.2.RELEASE/api/index.html?org/springframework/kafka/listener/ConsumerSeekAware.ConsumerSeekCallback.html). – Gregor Ophey Dec 07 '18 at 16:54
  • 1
    It's certainly feasible, but `ConsumerSeekCallback` is considered legacy since we now have consumer-aware listeners, error handlers and the container idle events also provide access to the consumer (all invoked on the consumer thread - for thread safety). If you have a compelling reason to extend the callback, open a GitHub issue. – Gary Russell Dec 07 '18 at 18:17
1

As Gary Russel pointed out above, ConsumerSeekCallback is legacy, so it's a no-go ... and I won't open a GitHub issue ...

I was finally able to achieve my objective:

When a user logs in she is to supposed to see all the events she missed out on since her last session.

by handling all new subscriptions in an EventListener for the ListenerContainerIdleEvent, where the consumer is available as part of the event data:

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void idleEventHandler(ListenerContainerIdleEvent event) {

        // find new subscriptions
        Collection<EventListenerSubscription> newSubscriptions = 
                subscriptions.stream().filter(s -> s.isNew())
                .collect(Collectors.toList());

        if (!newSubscriptions.isEmpty()) {

            // mark subscriptions a not new
            newSubscriptions.forEach(s -> s.setNew(false));

            // compute the oldest time stamp
            OptionalLong oldestTimeStamp = 
                    newSubscriptions.stream()
                    .mapToLong(s -> s.getLastTimeStamp())
                    .reduce(Long::min);

            if (oldestTimeStamp.isPresent()) {

                // seek on topic for oldest time stamp
                Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
                timestampsToSearch.put(new TopicPartition(eventTopic, 0),
                                       oldestTimeStamp.getAsLong());
                Consumer<?, ?> consumer = event.getConsumer();
                event.getConsumer().offsetsForTimes(timestampsToSearch).forEach((k, v) -> {
                    consumer.seek(k, v.offset());
                });
            }
        }
    }

I determine the oldest time stamp across all new subscriptions, mark these subscriptions as not new, and use the consumer seek back on the topic for the oldest time stamp.

In order to get the container idle event, the idle interval has to be configured in the container properties, as described here.

The KafkaListener will then take care of sending the old events to the (formerly new) subscribers:

    @KafkaListener(id = "qux", topics = { "${app.event.topic}" }, errorHandler = "kafkaListenerErrorHandler")
    public void receive(@Payload Event event, @Headers MessageHeaders headers) throws JsonProcessingException {

        // collect the subscribers not marked as new
        Collection<EventListenerSubscription> oldSubscriptions = 
                subscriptions.stream().filter(s -> !s.isNew())
                .collect(Collectors.toList());

        for (EventListenerSubscription s : oldSubscriptions) {
            if (s.getLastTimeStamp() < timestamp) {
                s.addMessage(event, timestamp);
            }
        }
    }
Gregor Ophey
  • 817
  • 6
  • 12