4

I want to fetch records for newly created records in a table in postgresql as a live/continuous stream. Is it possible to use using spring r2dbc? If so what options do I have?

Thanks

Spartacus
  • 337
  • 4
  • 12
  • what do you mean by constant stream from db? – mslowiak Oct 31 '21 at 09:10
  • 1
    @mslowiak, whenever a record is inserted in the DB, the item is available to the subscriber. Or is it possible the component which inserts the record to DB, it can be inserted to a Queue which is polled using Flux.interval? – Spartacus Oct 31 '21 at 11:37

1 Answers1

1

You need to use pg_notify and start to listing on it. Any change that you want to see should be wrapped in simple trigger that will send message to pg_notify.

I have an example of this on my github, but long story short:

prepare function and trigger:

CREATE OR REPLACE FUNCTION notify_member_saved()
    RETURNS TRIGGER
AS $$
BEGIN
    PERFORM pg_notify('MEMBER_SAVED',  row_to_json(NEW)::text);
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER member_saved_trigger
    AFTER INSERT OR UPDATE
    ON members
    FOR EACH ROW
EXECUTE PROCEDURE notify_member_saved();

In java code prepare listener


@Service
@RequiredArgsConstructor
@Slf4j
class NotificationService {


    private final ConnectionFactory connectionFactory;
    private final Set<NotificationTopic> watchedTopics = Collections.synchronizedSet(new HashSet<>());

    @Qualifier("postgres-event-mapper")
    private final ObjectMapper objectMapper;

    private PostgresqlConnection connection;


    @PreDestroy
    private void preDestroy() {
        this.getConnection().close().subscribe();
    }

    private PostgresqlConnection getConnection() {
        if(connection == null) {
            synchronized(NotificationService.class) {
                if(connection == null) {
                    try {
                        connection = Mono.from(connectionFactory.create())
                                .cast(Wrapped.class)
                                .map(Wrapped::unwrap)
                                .cast(PostgresqlConnection.class)
                                .toFuture().get();
                    } catch(InterruptedException e) {
                        throw new RuntimeException(e);
                    } catch(ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }
        return this.connection;
    }

    public <T> Flux<T> listen(final NotificationTopic topic, final Class<T> clazz) {

        if(!watchedTopics.contains(topic)) {
            executeListenStatement(topic);
        }

        return getConnection().getNotifications()
                .log("notifications")
                .filter(notification -> topic.name().equals(notification.getName()) && notification.getParameter() != null)
                .handle((notification, sink) -> {
                    final String json = notification.getParameter();
                    if(!StringUtils.isBlank(json)) {
                        try {
                            sink.next(objectMapper.readValue(json, clazz));
                        } catch(JsonProcessingException e) {
                            log.error(String.format("Problem deserializing an instance of [%s] " +
                                    "with the following json: %s ", clazz.getSimpleName(), json), e);
                            Mono.error(new DeserializationException(topic, e));
                        }
                    }
                });
    }

    private void executeListenStatement(final NotificationTopic topic) {
        getConnection().createStatement(String.format("LISTEN \"%s\"", topic)).execute()
                .doOnComplete(() -> watchedTopics.add(topic))
                .subscribe();
    }

    public void unlisten(final NotificationTopic topic) {
        if(watchedTopics.contains(topic)) {
            executeUnlistenStatement(topic);
        }
    }

    private void executeUnlistenStatement(final NotificationTopic topic) {
        getConnection().createStatement(String.format("UNLISTEN \"%s\"", topic)).execute()
                .doOnComplete(() -> watchedTopics.remove(topic))
                .subscribe();
    }
}

start listiong from controller

@GetMapping("/events")
    public Flux<ServerSentEvent<Object>> listenToEvents() {

        return Flux.merge(listenToDeletedItems(), listenToSavedItems())
                .map(o -> ServerSentEvent.builder()
                        .retry(Duration.ofSeconds(4L))
                        .event(o.getClass().getName())
                        .data(o).build()
                );

    }

    @GetMapping("/unevents")
    public Mono<ResponseEntity<Void>> unlistenToEvents() {
        unlistenToDeletedItems();
        unlistenToSavedItems();
        return Mono.just(
                ResponseEntity
                        .status(HttpStatus.I_AM_A_TEAPOT)
                        .body(null)
        );
    }

    private Flux<Member> listenToSavedItems() {
        return this.notificationService.listen(MEMBER_SAVED, Member.class);
    }


    private void unlistenToSavedItems() {
        this.notificationService.unlisten(MEMBER_SAVED);
    }

but remember that if something broke then you lost pg_notify events for some time so it is for non-mission-citical solutions.

Koziołek
  • 2,791
  • 1
  • 28
  • 48