5

I want to use synchronized block for source of flatMap. But I need to use this construct for processing (method processItem), not only when inner source is created.

This Observable is called each 5 minutes (for example Observable.interval)

Observable.fromIterable(getSourceData())
.flatMapCompletable(item -> {
  synchronized(LockManager.getInstance().getLockObject(item.id)){
    return processItem(item)
          .subscribeOn(Schedulers.io());
  }
})

My processsItem method looks like:

public Completable processItem(Item item){
  return mApiSource.getItemById(item.id)
    .flatMap(item -> 
      mItemRepository.replace(item)
    )
    .toCompletable();
}

Both inner methods return Single.

It is part of method for periodic updates from server. I need to serialize processItem method call (periodic synchronization of item from server) with methods for modifiction of Item (update, delete) which are called from other classes of project (synchronization for item with ).

Main problem is that periodic update can rewrite newly updated item.

Actually I use this solution:

Chain for updating new item:

public Completable updateItem(Item item){
  return Completable.fromAction(() -> {
        synchronized(LockManager.getInstance().getLockObject(item.id)){
          mApiSource.update(item)
          .flatMap(item -> 
            mItemRepository.replace(item)
          )
          .toCompletable()
          .blockingAwait();
        }
      })
      .subscribeOn(Schedulers.io())
}

Chain for periodic update:

Observable.fromIterable(getSourceData())
    .flatMapCompletable(item -> {
      Completable.fromAction(() ->{
        synchronized(LockManager.getInstance().getLockObject(item.id)){
          processItem(item).blockingAwait();
        }
      })
      .subscribeOn(Schedulers.io())
    });

I know that is not clear RxJava solution.

Do you know better solution for this problem?

elnino
  • 235
  • 3
  • 12
  • "Main problem is that periodic update can rewrite newly updated item." Not clear where you see the problem. Periodic update is an update, and its goal is to update, and the update means rewrite. If this rewrite is a problem, just do not launch the periodic update. – Alexei Kaigorodov May 02 '18 at 23:26
  • Problem is that data is not at server imediatelly but with delay. New item is inserted and periodic update can be performed without this item. New item will be lost. – elnino May 09 '18 at 17:01

0 Answers0