22

I want to request to a url using okhttp in another thread (like IO thread) and get Response in the Android main thread, But I don't know how to create an Observable.

Marcin Koziński
  • 10,835
  • 3
  • 47
  • 61

4 Answers4

20

It's easier and safer to use Observable.defer() instead of Observable.create():

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});

That way unsubscription and backpressure are handled for you. Here's a great post by Dan Lew about create() and defer().

If you wished to go the Observable.create() route then it should look more like in this library with isUnsubscribed() calls sprinkled everywhere. And I believe this still doesn't handle backpressure.

Marcin Koziński
  • 10,835
  • 3
  • 47
  • 61
13

I realise this post is a bit old, but there's a new and more convenient way of doing this now

Observable.fromCallable {
        client.newCall(Request.Builder().url("your url").build()).execute()
    }

More info: https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/

frankelot
  • 13,666
  • 16
  • 54
  • 89
  • Could you please clarify the code? A `new` is missing before `Request`, and after that it throws an error – Jose_GD Aug 23 '17 at 20:42
  • 5
    This is not Java, it's Kotlin – frankelot Aug 23 '17 at 21:06
  • Ops... missed that, sorry – Jose_GD Aug 24 '17 at 02:57
  • 1
    What versions of rxjava / okhttp are you using for this? newCall.execute() returns a Response object which isn't callable? – Mike Oct 16 '17 at 18:50
  • rxjava:2.1.5 / okhttp:3.9.0 - Callable means any method or function. Check this out: https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/ – frankelot Oct 16 '17 at 19:15
  • Ah yes it was the Kotlin that tripped me up, forgot to toss in the () -> {} for the Java call – Mike Oct 17 '17 at 17:53
  • 1
    Beware what happens when your subscription gets cancelled before... you would get one very generic error - UndeliverableException. – kboom Mar 15 '19 at 14:26
0

I came late to the discussion but, if for some reason the code need to stream the response body, then defer or fromCallable won't do it. Instead one can employ the using operator.

Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1
             response -> { // 2
                 ...

                 return Single.just((Consumer<OutputStream>) fileOutput -> {
                     try (InputStream upstreamResponseStream = response.body().byteStream();
                          OutputStream fileOutput = responseBodyOutput) {
                         ByteStreams.copy(upstreamResponseStream, output);
                     }
                 });
             },
             Response::close, // 3
             false) // 4
      .subscribeOn(Schedulers.io()) // 5
      .subscribe(copier -> copier.accept(...), // 6
                 throwable -> ...); // 7
  1. The first lambda executes the response after upon subscription.
  2. The second lambda creates the observable type, here with Single.just(...)
  3. The third lambda disposes the response. With defer one could have used the try-with-resources style.
  4. Set the eager toggle to false to make the disposer called after the terminal event, i.e. after the subscription consumer has been executed.
  5. Of course make the thing happen on another threadpool
  6. Here's the lambda that will consume the response body. Without eager set to false, the code will raise an IOException with reason 'closed' because the response will be already closed before entering this lambda.
  7. The onError lambda should handle exceptions, especially the IOException that cannot be anymore caught with the using operator as it was possible with a try/catch with defer.
bric3
  • 40,072
  • 9
  • 91
  • 111
0

Okhttp3 with RxSingle background API call.

     Disposable disposables = Single.fromCallable(() -> {
        Log.e(TAG, "clearData: Thread[" + Thread.currentThread().getName() + "]");
        OkHttpClient client = Util.getHttpClient();
        Request request = new Request.Builder()
                .addHeader("Authorization", "Bearer " + Util.getUserToken())
                .url(BuildConfig.BASE_URL + ApiConstants.DELETE_FEEDS)
                .build();

        Response response = client.newCall(request).execute();
        if(response.isSuccessful()) {
           ...
           return ; // Any  type
        } else {
           return ; // Any type        
        }
    }).subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe((result) -> {
           Log.d(TAG, "api() completed");
      });


    compositeDisposable.add(disposables);
Arul
  • 1,031
  • 13
  • 23
  • 1
    keep in mind that if you close the subscription before the http call completes, it will crash. – f.khantsis May 09 '20 at 18:51
  • @f.khantsis then what is the way to avoid this, wrap the HTTP call with try-catch – Arul May 12 '20 at 12:14
  • 1
    If you do that, you will lose out on exceptions. I just created a `MaybeFromUnsafeCallable` call which mimicks the MaybeFromCallable, but does not throw exception when the function threw an exception after the subscription was already closed. Ditto ObservableFromUnsafeCallable etc.. – f.khantsis May 12 '20 at 15:21