1

I'm new to vert.x framework and rx-java. I'm working with mongoDB currently. The problem is that I have to update same object several times, however only last update being executed. Code is provided below. findByProductId method returns required object. check is done as it might return false if object by this id is no in db yet.

fun cancelProductStateCountList(inventoryList: List<Inventory>) : Observable<List<ProductState>> {
    return Observable.create({ event ->
        val count = inventoryList.count()
        var counter = 0
        var itemCounter = 0
        val result = mutableListOf<ProductState>()
        var searchingId = ""
        Observable
                .fromArray(inventoryList)
                .flatMapIterable({
                    items -> items
                })
                .flatMap({
                    item ->
                    searchingId = item.productId!!
                    findByProductId(item.productId!!)
                })
                .flatMap ({
                    if(it is ProductState){
                        if(inventoryList[itemCounter].operation == InventoryOperation.HOLD.value()){
                            var quantityAvailable: Double = it.quantity!!
                            var quantityHold: Double = it.hold!!
                            quantityAvailable += inventoryList[itemCounter].quantity!!
                            quantityHold -= inventoryList[itemCounter].quantity!!
                            it.quantity = quantityAvailable
                            it.hold = quantityHold
                            itemCounter ++
                            updateWithoutDuplicate(it)
                        } else {
                            var quantityAvailable: Double = it.quantity!!
                            quantityAvailable += inventoryList[itemCounter].quantity!!
                            it.quantity = quantityAvailable
                            itemCounter ++
                            updateWithoutDuplicate(it)
                        }
                    } else {
                        Observable.just("")
                    }

                })
                .subscribe({
                    if(it is ProductState){
                        result.add(it)
                        counter ++
                        if (counter == count) {
                            event.onNext(result)
                        }
                    } else {
                        event.onError(NotExistsException("productId", searchingId))
                    }

                }, {
                    event.onError(it)
                })

    })
}

1 Answers1

1

There are a few problems with this code I can see:

  1. Use of flatMap() without any reason. Check how flatMap() works in this answer: When do you use map vs flatMap in RxJava?
  2. Using !!, aka bang-bang, in Kotlin. Usually that means you don't really understand how your API works, and will result in NullPointer eventually
  3. I guess that updateWithoutDuplicate() is the one going to MongoDB, although you didn't post actual code to it. But, seeing how you call it without any callback or await(), I'm assuming you expect it to behave synchronously. Considering that you're using both Vert.x and MongoDB, changes are, it's asynchronous.
Alexey Soshin
  • 16,718
  • 2
  • 31
  • 40