3

So I'm a beginner with RxJava but here's what I want to accomplish:

MainViewModel talks to the Repository. Repository has both LocalDataStore (that talks with the database) and RemoteDataStore (Retrofit) Both are different implementations of interface DataStore).

What I want to achieve is have a single call fetchData from the Repository that returns an Observable but:

  • it takes it from the RemoteDataStore at first
  • after fetching every single thing (onNext()), it inserts it into the database
  • if it fails, it returns results from the LocalDataStore.

However, I don't know how to implement this logic. Subscription happens on the ViewModel's end, but I cannot really change the observable to LocalDataStore from Repository end (?). Upserting data into the database also returns an Observable (Single to be precise) and for it to work it needs a subscription.

Could someone explain it to me or point me in a good direction?

My code (problem in repository comments):

Remote data store

 override fun getData(): Observable<SomeData> = api
    .getData(token)
    .flatMapIterable { x -> x }

Local data store

override fun saveData(data: SomeData): Single<SomeData> {
    return database.upsert(data)
}

Repository

 fun getData(): Observable<SomeData> {

    return
    remoteDataStore.getData()
            .doOnError {
                localDataStore.getData() //? this will invoke but nothing happens because I'm not subscribed to it
            }
            .doOnNext {
                saveData(it) //The same as before, nothing will happen
            }
}

ViewModel

override fun fetchData() {
repository.getData()
        .observeOn(androidScheduler)
        .subscribeOn(threadScheduler)
        .subscribe(
                { data: SomeData ->
                    dataList.add(data)
                },
                { throwable: Throwable? ->
                    handleError(throwable)
                },
                {
                    //send data to view
                },
                { disposable: Disposable ->
                    compositeDisposable.add(disposable)
                }
        )
}

Thank you for your time.

geekx
  • 57
  • 7
dabu
  • 155
  • 1
  • 10

2 Answers2

4

You need to use one of onErrorResumeNext methods. I would also suggest to change your stream type from Observable to Single as nature of your data seems like Get data once or throw error. It's just a good API design.

In your particular case I would implement the repository this way:

class RepositoryImpl @Inject constructor(private val localRepository: Repository, private val remoteRepository: Repository) : Repository {
    override fun getData(): Single<Data> = remoteRepository.getData()
        .onErrorResumeNext { throwable ->
            if (throwable is IOException) {
                return localRepository.getData()
            }
            return Single.error(throwable)
        }
}

You might ask why only catch IOException? I usually handle only this exception to not miss anything critical but only unimportant network errors. If you will catch every exception you might miss, for example, a NullPointerException.

andrei_zaitcev
  • 1,358
  • 16
  • 23
  • Thank you for your well put answer. I'm not yet sure about the syntax of this onErrorResumeNext call, so maybe you could help me with it. Using your code with a specified throwable (so t: Throwable -> {...}) I get error that return is not allowed there. Is there an additional method that could let me save things invoking a function that also returns an Observable? – dabu Aug 27 '17 at 10:16
  • 1
    @dabu have you changed a return type of your local and remote repository? Make sure it's Single. If it doesn't help then if it's possible to create a GitHub gist for instance? So I can take a look. – andrei_zaitcev Aug 27 '17 at 10:21
  • Thank you for your help. I've actually managed to get the desired behaviour, however I'm not yet sure why it works. Here's the gist with the code: https://gist.github.com/rafal-adamek/0d8d0a17272d9d2eb730b1eac4d3c872 I needed to add an annotation and subscribe to saving. Of course it's not yet complete (I will implement some error checking) but it works. Will need to educate myself more with Rx to fully understand this code but if I'd appreciate any tips – dabu Aug 27 '17 at 12:38
  • 1
    @dabu my bad. I forgot about that Kotlin behaviour. Take a look at https://kotlinlang.org/docs/reference/returns.html#return-at-labels and https://stackoverflow.com/a/40166597/921834. – andrei_zaitcev Aug 28 '17 at 08:25
1

onErrorResumeNext is what you're looking for. doOnError invokes a side-effecting action, doesn't replace the original Observable with another one.

Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
  • Thank you for help. You're of course right and I appreciate that but I've marked @andrei_zaitcev response as accepted because it made me understand the problem more clearly. I hope it's ok! Cheers! – dabu Aug 27 '17 at 12:40
  • @dabu you're free to accept whatever answer you prefer; thanks for your comment. – Abhijit Sarkar Aug 27 '17 at 18:44