I'm new to vert.x framework and rx-java. I'm working with mongoDB currently. The problem is that I have to update same object several times, however only last update being executed. Code is provided below. findByProductId method returns required object. check is done as it might return false if object by this id is no in db yet.
fun cancelProductStateCountList(inventoryList: List<Inventory>) : Observable<List<ProductState>> {
return Observable.create({ event ->
val count = inventoryList.count()
var counter = 0
var itemCounter = 0
val result = mutableListOf<ProductState>()
var searchingId = ""
Observable
.fromArray(inventoryList)
.flatMapIterable({
items -> items
})
.flatMap({
item ->
searchingId = item.productId!!
findByProductId(item.productId!!)
})
.flatMap ({
if(it is ProductState){
if(inventoryList[itemCounter].operation == InventoryOperation.HOLD.value()){
var quantityAvailable: Double = it.quantity!!
var quantityHold: Double = it.hold!!
quantityAvailable += inventoryList[itemCounter].quantity!!
quantityHold -= inventoryList[itemCounter].quantity!!
it.quantity = quantityAvailable
it.hold = quantityHold
itemCounter ++
updateWithoutDuplicate(it)
} else {
var quantityAvailable: Double = it.quantity!!
quantityAvailable += inventoryList[itemCounter].quantity!!
it.quantity = quantityAvailable
itemCounter ++
updateWithoutDuplicate(it)
}
} else {
Observable.just("")
}
})
.subscribe({
if(it is ProductState){
result.add(it)
counter ++
if (counter == count) {
event.onNext(result)
}
} else {
event.onError(NotExistsException("productId", searchingId))
}
}, {
event.onError(it)
})
})
}