0

How to continue executing a Retrofit network call observable that takes in as items as input obtained from another observable that is converted to flatMapIterable even if an error was encountered and stop iterating only if I encounter a specific HTTP status code?

I have a list of JSON requests saved in shared preferences, that I need to send one by one using Retrofit, only stopping whenever I get a certain HTTP status code. If I get other exceptions, I just need to continue sending the next items in the requests list. Whenever a request receives a successful response, I remove that particular request from my request list. If other requests have encountered errors, they do not get removed from the list and I save them again to the shared preferences.

So far, I do this inside a ViewModel object. First, I fetch these requests via a method (paramRepository.getSavedOfflineRequest()) that returns an RxJava Observable<List<Request>>. I want to iterate through all the requests so that I can send the items as inputs to apiService.sale, which is my Retrofit call, so I use flatMapIterable. If the request is successful, I remove the request and save a Transaction object to DB.

public LiveData<UploadStatus> startUploading() {
    MutableLiveData<UploadStatus> uploadStatus = new MutableLiveData<>();
    compositeDisposable.add(paramRepository.getSavedOfflineRequest()
        .doOnComplete(() -> uploadStatus.setValue(UploadStatus.NO_ITEMS))
        .flatMapIterable( requests -> requests)
        .flatMapCompletable(request -> apiService.sale(saleUrl, BuildConfig.ApiKey,request)
            .doOnSuccess(response -> {
                requestList.remove(request);
                transactions.add(createTransaction(request, response));
            }).ignoreElement()
         )
        .andThen(saveUploadedToDb(transactions))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(() -> uploadStatus.setValue(UploadStatus.SUCCESS),
            error -> {
                Log.d(TAG, "error");
                if (error instanceof HttpException) {
                    HttpException httpException = (HttpException) error;
                    int statusCode = httpException.code();
                    if (statusCode == 401) {
                        Log.d(TAG, "logged out");
                        uploadStatus.setValue(UploadStatus.LOGGED_OUT);
                    }
                } else {
                    uploadStatus.setValue(UploadStatus.FAIL);
                }
            }));

    return uploadStatus;
}

I expect that if if I get other errors/exceptions, I just continue making calls using apiService.sale with the next Request item. But I noticed that the whole chain stops when just one error is encountered, thus the other Requests have not been sent.

I have tried onErrorResumeNext, but it expects a return of another kind of Exception, which is totally different from what I want (do nothing for other Exceptions).

Alvin Dizon
  • 1,855
  • 14
  • 34

1 Answers1

0

You probably want the observable to return UploadStatus and map the response.

E.g.

    .map { response ->
        switch(response.code()) {
            case 200:
                return UploadStatus.SUCCESS;
            case 401:
                return UploadStatus.LOGGED_OUT;
            default:
                return UploadStatus.FAIL;
        }
     }

In the case of an exception, you can use onErrorReturn to return UploadStatus.FAIL/ERROR. This won't terminate the stream.

    .onErrorReturn { UploadStatus.FAIL }
Damon Baker
  • 887
  • 7
  • 9
  • I refactored my code to be similar to the answer for this post describing a situation very similar to mine: https://stackoverflow.com/questions/52617904/rxjava-upstream-never-completes-when-error-is-swallowed?rq=1 – Alvin Dizon Jul 19 '19 at 01:00