I am trying to replace a PostgreSQL database poller with the reactive asynchronous postgres-async-driver and stream newly inserted rows to a Spring 5 Webflux Reactive websocket client like Josh Long's awesome example demoed here and based on Sébastien Deleuze's spring-reactive-playground.
My Publisher
obtains the first row
, but then does not return subsequent rows.
Is the problem with my Observable
, my Publisher
, or with how I am using the postgres-async-driver Db
?
public Observable<WebSocketMessage> getObservableWSM(WebSocketSession session){
return
// com.github.pgasync.Db
db.queryRows(sql)
// ~RowMapper method
.map(row -> mapRowToDto(row))
// serialize dto to String for websocket
.map(dto -> { return objectMapper.writeValueAsString(dto); })
// finally, write to websocket session
.map(str -> { return session.textMessage((String) str);
});
}
Then, I wire the Observable
into my WebSocketHandler
using a RxReactiveStream.toPublisher
converter:
@Bean
WebSocketHandler dbWebSocketHandler() {
return session -> {
Observable<WebSocketMessage> o = getObservableWSM(session);
return session.send(Flux.from(RxReactiveStreams.toPublisher(o)));
};
}
That will grab the first row
from my sql
statement, but no additional rows. How do I continue to stream additional rows?
Ideally, I think I want the PostgreSQL equivalent of a MongoDB Tailable cursor.