1

I've looked at the guide to work with reactive SQL clients (https://quarkus.io/guides/reactive-sql-clients#using) but I can't seem to figure out how one would work with Transactions. Let's say I'd want to enhance this demo fruit app by using transactions.

How would I make the following method make use of a transaction that also reverts all made changes if something in the transaction failed?

public static Multi<Fruit> findAll(PgPool client) {
        return client.query("SELECT id, name FROM fruits ORDER BY name ASC")
                .onItem().produceMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
                .onItem().apply(Fruit::from);
    }
Newcomer66
  • 11
  • 1

1 Answers1

1

You can use a parent stage with then() or onItem().produceMulti() to give all of your sub-stages access to the parent stage item (in this case a Transaction). This will allow the later sub-stages to directly access the Transaction object for close/rollback.

For example:

return pgPool.begin()
    .onItem().produceMulti(tx -> {
        return tx.query("DELETE FROM fruits").execute()
                 .onItem().invoke(delete -> tx.query("SELECT id, name FROM fruits ORDER BY name ASC").execute())
                 .onItem().produceMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
                 .onItem().apply(Fruit::from)
                 .onFailure().invoke(ex -> tx.rollback())
                 .on().termination(() -> tx.close());
        });
Andy Guibert
  • 41,446
  • 8
  • 38
  • 61
  • Thanks! I thought about this approach as well but the code that you supplied does not seem to close the connection/transaction when it is successful. After 15 times of executing this code, the application gets in a blocking stage due to all the connections still being open. I tried adding an onCompleted at the end but this is never called. How would I make sure that it closes the connection/transaction every time? Also; any chance you could show me how I could first add in a DELETE query and then the select, rollbacking everything including the DELETE query when the SELECT fails? – Newcomer66 May 02 '20 at 16:37
  • good point, I've updated my answer to use a different approach now, I also included the initial DELETE operation in the flow – Andy Guibert May 04 '20 at 15:57
  • Instead of `invoke` you need: `produceUni`, instead of on().termination(), you may want to use tx.commit() just before the failure management. – Clement May 04 '20 at 17:37