I want to use synchronized block for source of flatMap. But I need to use this construct for processing (method processItem), not only when inner source is created.
This Observable is called each 5 minutes (for example Observable.interval)
Observable.fromIterable(getSourceData())
.flatMapCompletable(item -> {
synchronized(LockManager.getInstance().getLockObject(item.id)){
return processItem(item)
.subscribeOn(Schedulers.io());
}
})
My processsItem method looks like:
public Completable processItem(Item item){
return mApiSource.getItemById(item.id)
.flatMap(item ->
mItemRepository.replace(item)
)
.toCompletable();
}
Both inner methods return Single.
It is part of method for periodic updates from server. I need to serialize processItem method call (periodic synchronization of item from server) with methods for modifiction of Item (update, delete) which are called from other classes of project (synchronization for item with ).
Main problem is that periodic update can rewrite newly updated item.
Actually I use this solution:
Chain for updating new item:
public Completable updateItem(Item item){
return Completable.fromAction(() -> {
synchronized(LockManager.getInstance().getLockObject(item.id)){
mApiSource.update(item)
.flatMap(item ->
mItemRepository.replace(item)
)
.toCompletable()
.blockingAwait();
}
})
.subscribeOn(Schedulers.io())
}
Chain for periodic update:
Observable.fromIterable(getSourceData())
.flatMapCompletable(item -> {
Completable.fromAction(() ->{
synchronized(LockManager.getInstance().getLockObject(item.id)){
processItem(item).blockingAwait();
}
})
.subscribeOn(Schedulers.io())
});
I know that is not clear RxJava solution.
Do you know better solution for this problem?