12

I am trying to replace a PostgreSQL database poller with the reactive asynchronous postgres-async-driver and stream newly inserted rows to a Spring 5 Webflux Reactive websocket client like Josh Long's awesome example demoed here and based on Sébastien Deleuze's spring-reactive-playground.

My Publisher obtains the first row, but then does not return subsequent rows. Is the problem with my Observable, my Publisher, or with how I am using the postgres-async-driver Db?

public Observable<WebSocketMessage> getObservableWSM(WebSocketSession session){
    return
        // com.github.pgasync.Db
        db.queryRows(sql)
        // ~RowMapper method
        .map(row -> mapRowToDto(row))
        // serialize dto to String for websocket
        .map(dto -> { return objectMapper.writeValueAsString(dto); })
        // finally, write to websocket session 
        .map(str -> { return session.textMessage((String) str);
        });
}

Then, I wire the Observable into my WebSocketHandler using a RxReactiveStream.toPublisher converter:

@Bean
WebSocketHandler dbWebSocketHandler() {
    return session -> {
        Observable<WebSocketMessage> o = getObservableWSM(session);
        return session.send(Flux.from(RxReactiveStreams.toPublisher(o)));
    };
}

That will grab the first row from my sql statement, but no additional rows. How do I continue to stream additional rows?

Ideally, I think I want the PostgreSQL equivalent of a MongoDB Tailable cursor.

JJ Zabkar
  • 3,792
  • 7
  • 45
  • 65
  • 1
    Hi Zabkar, `postgres-async-driver` is not really reactive. Please see my [answer](https://stackoverflow.com/questions/48402550/webflux-postgresql/48402788#48402788). Tailable cursor works on **capped collections** wich is a specificity of MongoDB. – Montassar El Béhi Mar 21 '18 at 15:49
  • @montassar The linked answer also sidesteps Postgres. There's already a solution out there for Redis and MongoDB; I'm looking for a Postgres solution. – JJ Zabkar Mar 21 '18 at 16:48
  • 1
    Agree. I attended the Spring Boot 2.0 webinar by Phil Webb and we asked the question. The response was clearly: Not yet. I can notice that you use Rx-Java but I saw a **Reactor** project feature wich is `Scheduler` susceptible to help you. Please see Grygoriy Gonchar's [answer](https://stackoverflow.com/questions/42299455/spring-webflux-and-reading-from-database). Personnally, I see that as a workAround, but if you don't want to wait... :) – Montassar El Béhi Mar 21 '18 at 17:03

1 Answers1

3

I created a Postgres Trigger that fires on INSERTs to my table based on this example:

CREATE OR REPLACE FUNCTION table_update_notify() RETURNS trigger AS $$
DECLARE
  id bigint;
BEGIN
  IF TG_OP = 'INSERT' THEN
    id = NEW.id;
  ELSE
    id = OLD.id;
  END IF;
  PERFORM pg_notify('my_trigger_name', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP)::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Then I subscribed to that Postgres Trigger using reactive-pg-client. Here is the code from their Pub/Sub example:

@Bean
PgPool subscribedNotificationHandler() {
    PgPool client = pgPool();
    client.getConnection(asyncResult -> {
        if (asyncResult.succeeded()) {
            PgConnection connection = asyncResult.result();
            connection.notificationHandler(notification -> {
                notification.getPayload();
                // do things with payload
            });
            connection.query("LISTEN my_trigger_name", ar -> {
                log.info("Subscribed to channel");
            });
        }
    });
    return client;
}
JJ Zabkar
  • 3,792
  • 7
  • 45
  • 65