I'm trying to insert java objects from a Flux
and generate responses per object, either successful or unsuccessful. Basically the code looks like:
private ReactiveMongoTemplate mongoTemplate;
public <T extends MyData, U extends MyResult> Flux<U> doTransfer(Flux<T> input, String collectionName) {
return input.buffer(100)
.map(lst -> mongoTemplate
.insertAll(lst)
.map(mydata -> generateResult(mydata))
.onErrorResume(DataAccessException.class, ex -> fluxForErrorCase(ex, lst))
)
.flatMap(Function.identity());
}
private <T extends MyData, U Extends MyResult> Flux<U> fluxForErrorCase(DataAccessException ex, List<T> input) {
// I only have #inserted, #updated and error message here.
// My input is not mutated to have non-null _ids, either
}
By defining a uniqueness constraint and sending appropriate data, I can get an exception (one with a MongoBulkWriteException
as cause) for the submitted batch, but I'd like to know which rows arrived into my database and with which I had issues. Moreover, some objects are written to the database but they are not mutated to have generated ids.
I know there can be alternative solutions by inserting in a multi-document transaction (requires a replica set for a simple integration test) or generating _id
values myself (requires a query for inserted values, or assuming an error causes succeeding items not to be written) but I'd like to hear if there are cleaner approaches for error handling here.