4

I'm getting a OnErrorNotImplementedException thrown and the app crashes, despite handling the error downstream(?).

Exception

E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: pl.netlandgroup.smartsab, PID: 9920
io.reactivex.exceptions.OnErrorNotImplementedException: HTTP 401 Unauthorized
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
    at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63)
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:56)
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37)
    at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43)
    at io.reactivex.Observable.subscribe(Observable.java:10838)
    at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
    at io.reactivex.Observable.subscribe(Observable.java:10838)
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)
 Caused by: retrofit2.adapter.rxjava2.HttpException: HTTP 401 Unauthorized
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:54)
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37) 
    at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43) 
    at io.reactivex.Observable.subscribe(Observable.java:10838) 
    at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) 
    at io.reactivex.Observable.subscribe(Observable.java:10838) 
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) 
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) 
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) 
    at java.lang.Thread.run(Thread.java:761) 

Retrofit Repository:

class RetrofitRepository (retrofit: Retrofit) {

    val apiService: ApiService = retrofit.create(ApiService::class.java)
    var size: Int = 0

    fun getMapResponse(pageIndex: Int = 0): Observable<MapResponse> {
        return apiService.getMapResponse(pageIndex = pageIndex)
                .doOnError { Log.d("error", it.message) }
                .doOnNext { Log.d("currThread", Thread.currentThread().name) }

    }

    fun getItemsFormResponses(): Observable<List<Item>> {
        val list = mutableListOf<Observable<List<Item>>>()
        val resp0 = getMapResponse()
        resp0.subscribe { size = it.totalCount }
        var accum = 0
        do {
            list.add(getMapResponse(accum).map { it.items })
            accum++
        } while (list.size*200 < size)
        return Observable.merge(list)
    }
}

This results are observed by Interactor:

class MapInteractor @Inject constructor(private val repository: RetrofitRepository) {

    fun getMapItems(): Observable<MapViewState> {
        return repository.getItemsFormResponses()
                .map {
                    if(it.isEmpty()) {
                        return@map MapViewState.EmptyResult()
                    } else {
                        val mapItems = it.map { it.toMapItem() }
                        return@map MapViewState.MapResult(mapItems)
                    }
                }
                .doOnNext { Log.d("currThread", Thread.currentThread().name) }
                .startWith(MapViewState.Loading())
                .onErrorReturn { MapViewState.Error(it) }
    }
}

The onErrorReturn { MapViewState.Error(it) } emits correctly (right before the app crash I can see the correct thing rendered on screen). How can I avoid this exception while still maintaining the MVI architecture?

EDIT

The answer provided by dimsuz was the correct solution, although to achieve the merging and returning one Observable with all items it had to be modified to this:

fun getMapItems(): Observable<List<Item>> {
    return getMapResponse().map {
        val size = it.totalCount
        val list = mutableListOf<Observable<List<Item>>>()
        var accum = 0
        do {
            list.add(getMapResponse(accum++).map { it.items })
        } while (list.size*200 < size)
        return@map list.zip { it.flatten() }
    }.mergeAll()
}
Snayk
  • 43
  • 1
  • 4
  • I can't spot something wrong immediately. Can you add the whole stacktrace and the whole code of the Presenter (alternatively a link to your project if it is open source i.e. on github)? – sockeqwe Jul 05 '17 at 14:42

2 Answers2

4

Solution has already been shown in the previous answer, however the exact reason why you get 'OnErrorNotImplementedException' is that you probably subscribe with 'onSuccess' Consumer only.

You should add another consumer which will handle error events.

getMapResponse()
            .subscribe ( 
                     { size = it.totalCount },
                     {/* Do something with the error*/}
            )

'doOnError' will not prevent you from getting exceptions.

3

I believe the error is thrown in getItemsFromResponse() along the lines of:

    val resp0 = getMapResponse()
    resp0.subscribe { size = it.totalCount }

Here you subscribe, but do not handle the error case. Actually this code is incorrect, because you break the Rx chain in two independent pieces, you shouldn't do that.

What you should do is something like this:

fun getItemsFormResponses(): Observable<List<Item>> {
  return getMapResponse().map { resp0 ->
    val size = resp0.totalCount

    val list = mutableListOf<Observable<List<Item>>>()
    var accum = 0
    do {
      list.add(getMapResponse(accum).map { it.items })
      accum++
    } while (list.size*200 < size)
    return Observable.merge(list)
  }
}

I.e. extract size by extending the chain with an operator, rather then breaking it with subscribe().

dimsuz
  • 8,969
  • 8
  • 54
  • 88