I am trying to only fetch newly created messages from a reactive Mongodb repository using Spring Data.
The client is fetching the messages via SSE. I am using an "after"-query, which should only return messages which were sent after "LocalDateTime.now()".
Unfortunately the SSE is pushing out old messages which are older than "now", too. I have no clue why it returns those old messages.
My controller method:
@GetMapping(value = "/receiving-sse", produces = "text/event-stream")
public Flux<Message> streamEvents() {
Mono<String> username = getUsernameFromAuth();
Flux<Message> message = findOrCreateUser(username)
.flatMapMany(user -> messageRepository
.findAllBySenderIdOrReceiverIdAndSentAtAfter(user.getId(), user.getId(), LocalDateTime.now()));
Flux<Message> heartBeat = Flux.interval(Duration.ofSeconds(30)).map(sequence -> {
Message heartBeatMessage = new Message();
heartBeatMessage.setHeartbeat(true);
return heartBeatMessage;
});
return Flux.merge(message, heartBeat);
}
My repository:
public interface MessageRepository extends ReactiveMongoRepository<Message, String> {
Flux<Message> findAllByReceiverId(String receiverId);
@Tailable
Flux<Message> findAllBySenderIdOrReceiverIdAndSentAtAfter(String senderId, String receiverId, LocalDateTime sentAt);
Flux<Message> findAllBySenderId(String senderId);
Flux<Message> findAllByIdIn(Collection<String> ids);
}
And my document:
@Data
@Document
public class Message {
private String id;
private LocalDateTime sentAt;
private String message;
private boolean heartbeat;
@DBRef
private User sender;
@DBRef
private User receiver;
}
Any hints on why the repo is fetching messages that have a "sentAt" older than "LocalDateTime.now()" is much appreciated.