I wrap listener with below code,
public Observable<String> getIdToken() {
return Observable.create(emitter -> {
firebaseAuth.getAccessToken(false)
.addOnSuccessListener(getTokenResult -> {
emitter.onNext(getTokenResult.getToken());
})
.addOnFailureListener(e -> {
if (e instanceof FirebaseAuthInvalidUserException) {
emitter.onError(new BaseException("", ERR_CODE_ACCOUNT_BANNED));
} else {
emitter.onError(e);
}
});
});
}
public Observable<BaseResponse> register() {
return getIdToken()
.flatMap(idToken -> userApi.register(idToken));
}
but meet a bug,
.flatMap(idToken -> userApi.register(idToken));
this code tip run in mainthread,I think
firebaseAuth.getAccessToken(false)
inner code auto switch in workthread and exec a network request to google firebase,when onSuccess,it inner code switch into nowThread(MainTread),then it make
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
not work.but how can handle this thread switch to make flatMap in workthread?I invoke the register func like below:
Disposable disposable = UserModel.getInstance().register()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(baseResponse -> {
Toast.makeText(this, "a", Toast.LENGTH_SHORT).show();
}, throwable -> {
throwable.printStackTrace();
Log.d("MainActivity111", throwable.getMessage());
});
logcat tip error :
08-28 11:18:26.634 6870-6870/? W/System.err: android.os.NetworkOnMainThreadException
at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1450)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:355)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:357)
at java.net.Socket.connect(Socket.java:616)
at okhttp3.internal.platform.AndroidPlatform.connectSocket(AndroidPlatform.java:71)
at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:240)
at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:160)
at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:257)
at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
08-28 11:18:26.635 6870-6870/? W/System.err: at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
08-28 11:18:26.636 6870-6870/? W/System.err: at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
at okhttp3.RealCall.execute(RealCall.java:77)
08-28 11:18:26.637 6870-6870/? W/System.err: at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:42)
at io.reactivex.Observable.subscribe(Observable.java:12036)
08-28 11:18:26.638 6870-6870/? W/System.err: at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
at io.reactivex.Observable.subscribe(Observable.java:12036)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:165)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:67)
08-28 11:18:26.639 6870-6870/? W/System.err: at cn.candrwow.coincoin.model.UserModel.lambda$null$0(UserModel.java:37)
at cn.candrwow.coincoin.model.-$$Lambda$UserModel$TP1OIyNide1WpDi-afDDpxaJhcE.onSuccess(Unknown Source:4)
at com.google.android.gms.tasks.zzn.run(Unknown Source:27)
at android.os.Handler.handleCallback(Handler.java:790)
08-28 11:18:26.640 6870-6870/? W/System.err: at android.os.Handler.dispatchMessage(Handler.java:99)
at android.os.Looper.loop(Looper.java:164)
at android.app.ActivityThread.main(ActivityThread.java:6494)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807)
Update
I found the firebase listener inner code force switch thread to mainThread,not the thread who call it,so the above problem
emitter.onNext(getTokenResult.getToken());
is force into mainThread,not the
.subscribeOn(Schedulers.io())
firebase inner code is not open,I log below code(this is in Activity.onCreate) and found it force switch:
new Thread(() -> {
Log.d("TestActivity", Thread.currentThread().getName());
FirebaseAuth.getInstance().getAccessToken(false)
.addOnSuccessListener(getTokenResult -> {
Log.d("TestActivity", Thread.currentThread().getName());
});
}).start();
logcat is this:
08-28 16:47:47.232 12289-12384/? D/TestActivity: Thread-5
main
I add .observeOn(Schedulers.io()) is success,and Answers's advice is success too.
public Observable<BaseResponse> register() {
return getIdToken()
// .observeOn(Schedulers.io())
.flatMap(idToken -> userApi.register(idToken).subscribeOn(Schedulers.io()));
}
This is my UserApi interface:
public interface UserApi {
@FormUrlEncoded
@POST("account/register")
Observable<BaseResponse> register(@Field("idToken") String idToken);
@GET("account/testGoogle")
Observable<BaseResponse> test(@Query("idToken") String idToken);
}