0

following the suggestions are given to me in the answers to my previous question How to test a ConsumerAwareRebalanceListener?, I was able to set up a ConsumerAwarereBalanceListener using Spring Kafka 2.1.x.

However, the following code

Map<TopicPartition, Long> queries = new HashMap<>();
final Long rewindTime = getRewindTime();
for (TopicPartition partition : partitions) {
    queries.put(partition, rewindTime);
}
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(queries);
result.forEach((key, value) -> consumer.seek(key, value.offset()));

throws a NullPointerException if the result of the query on the topic is empty. In this case, value is null. The stacktrace is the following.

java.lang.NullPointerException: null
    at org.rcardin.MyConsumerAwareRebalancedListener.lambda$onPartitionsAssigned$0(MyConsumerAwareRebalancedListener.java:40) ~[classes/:na]
    at java.util.HashMap.forEach(HashMap.java:1288) ~[na:1.8.0_144]
    at org.rcardin.MyConsumerAwareRebalancedListener.onPartitionsAssigned(MyConsumerAwareRebalancedListener.java:40) ~[classes/:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:596) ~[spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) [kafka-clients-1.0.2.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) [spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]

I am trying to write a test, which aim is to verify this behavior. However, since the exception happens on a separated thread than the main thread, I cannot retrieve such exception.

Any suggestion?

Thanks a lot,

riccardo.cardin
  • 7,971
  • 5
  • 57
  • 106

1 Answers1

1

According the offsetsForTimes() Javadocs this looks like a bug in your code:

 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.

So, you definitely should guard your value.offset() with null check:

result.entrySet()
            .stream()
            .filter(e -> e.getValue() != null)
            .forEach(e -> consumer.seek(e.getKey(), e.getValue().offset()));
Artem Bilan
  • 113,505
  • 11
  • 91
  • 118