1

I'm trying to insert java objects from a Flux and generate responses per object, either successful or unsuccessful. Basically the code looks like:

private ReactiveMongoTemplate mongoTemplate;

public <T extends MyData, U extends MyResult> Flux<U> doTransfer(Flux<T> input, String collectionName) {
    return input.buffer(100)
        .map(lst -> mongoTemplate
                .insertAll(lst)
                .map(mydata -> generateResult(mydata))
                .onErrorResume(DataAccessException.class, ex -> fluxForErrorCase(ex, lst))
            )
       .flatMap(Function.identity());
}

private <T extends MyData, U Extends MyResult> Flux<U> fluxForErrorCase(DataAccessException ex, List<T> input) {
   // I only have #inserted, #updated and error message here.
   // My input is not mutated to have non-null _ids, either
}

By defining a uniqueness constraint and sending appropriate data, I can get an exception (one with a MongoBulkWriteException as cause) for the submitted batch, but I'd like to know which rows arrived into my database and with which I had issues. Moreover, some objects are written to the database but they are not mutated to have generated ids.

I know there can be alternative solutions by inserting in a multi-document transaction (requires a replica set for a simple integration test) or generating _id values myself (requires a query for inserted values, or assuming an error causes succeeding items not to be written) but I'd like to hear if there are cleaner approaches for error handling here.

1 Answers1

0

In order to provide "a" solution, I'd like to share my approach:

MongoDB bulk writes have two operation modes: ordered and unordered. Ordered mode means subsequent operations are not carried out once an error is encountered.

After browsing the source for ReactiveMongoTemplate, one sees that ReactiveMongoTemplate.insertAll() call eventually boils down to a MongoCollection.insertMany() with a default InsertManyOptions, in which the operation mode is ordered. At least, this is the case for 2.1.9.RELEASE .

Hence, generating _id values on the client side and considering the document and the following documents not inserted -as in the code snippet given- works.