1

Let's consider this function

@Transactional
fun conditionalInsertEntity(dbEntity: DBEntity): Mono<DBEntity> {
    return fetchObjectByPublicId(dbEntity.publicId)
        .switchIfEmpty {
            r2DatabaseClient.insert()
                .into(DBEntity::class.java)
                .using(Flux.just(dbEntity))
                .fetch()
                .one()
                .map { it["entity_id"] as Long }
                .flatMap { fetchObjectById(it) }
        }
}

while running above function with following driver code I get duplicate entry errors if the list contains duplicates. Ideally it shouldn't give that error because the above function is already handling the case for duplicate inserts!!

val result = Flux.fromIterable(listOf(dbEntity1, dbEntity1, dbEntity2))
    .flatMap { conditionalInsertEntity(it) }
    .collectList()
    .block()
  • I don't see the part that handles it, can you elaborate? – Adam Arold Nov 09 '20 at 08:17
  • the conditional insert function first tries to fetch entity by ID, and only if the entity with that id isn't available already, it inserts the entity (switchIfEmpty{} part) – Akash Gupta Nov 09 '20 at 09:34

1 Answers1

1

Realized that this is an issue of using flatMap instead of concatMap. ConcatMap collects the result from individual publishers sequentially unlike flatMap. (more here)

Because I used flatMap, multiple publishers thought that the entity isn't already available in the DB

  • 1
    I believe there is one more issue ie switchIfEmpty. switchIfEmpty is eagerly evaluated. Please read this https://stackoverflow.com/questions/54373920/mono-switchifempty-is-always-called. Have you unit tested when entity already exists in DB ?? – Harry Dec 21 '21 at 19:12