11

Brief introduction for those not familiar with Android and/or Firebase development:

In Android development, you should always manipulate your application's views from the main thread (also called UI thread), but if your application needs to make some heavy processing, it should use a background thread, otherwise the app would seem unresponsive.

Firebase is a service that offers a way to store and sync data with a NoSQL database in the cloud. It also offers an Android SDK to manage this database. Every time this SDK is used to make an operation, like a query, Firebase avoids those threading pitfalls by making all of its heavy processing on its own internal background thread and by always calling its callbacks on the main thread.

Example:

Query postsQuery = FirebaseDatabase.getInstance().getReference("posts");

ValueEventListener postListener = new ValueEventListener() {
  @Override
  public void onDataChange(DataSnapshot dataSnapshot) {
    // This is always called on the main thread
    // Get Post object and use the values to update the UI
    Post post = dataSnapshot.getValue(Post.class);
    // ...
  }

  @Override
  public void onCancelled(DatabaseError databaseError) {
    // Getting Post failed, log a message
    printError(databaseError.toException());
    // ...
  }
};

postsQuery.addValueEventListener(postListener);

The actual issue I'm facing:

I'm trying to wrap Firebase's query listeners with RxJava using a method like this:

private static Observable<DataSnapshot> queryObservable(final Query query) {
  return Observable.fromEmitter(emitter -> {
    // This is called on the Scheduler's thread defined with .subscribeOn()
    printThread("emitter");
    final ValueEventListener listener = new ValueEventListener() {
      @Override public void onDataChange(final DataSnapshot dataSnapshot) {
        // This is always called on the main thread
        printThread("onDataChange");
        emitter.onNext(dataSnapshot);
      }

      @Override public void onCancelled(final DatabaseError databaseError) {
        // This is called on the main thread too
        emitter.onError(databaseError.toException());
      }
    };

    query.addValueEventListener(listener);

    emitter.setCancellation(() -> query.removeEventListener(listener));
  }, Emitter.BackpressureMode.BUFFER);
}

But because the Observable is emitting items from inside the Firebase's callback (called on the main thread) any further .subscribeOn() operators are going to be ignored.

For example, calling the above method like this:

Query postsQuery = FirebaseDatabase.getInstance().getReference("posts");

queryObservable(postsQuery).doOnSubscribe(() -> printThread("onSubscribe"))
    .subscribeOn(Schedulers.io())
    .subscribe(dataSnapshot -> printThread("onNext"));

Would print the following:

onSubscribe Thread: RxIoScheduler-2
emitter Thread: RxIoScheduler-2
onDataChange Thread: main
onNext Thread: main

From what I understand, when Firebase's SDK calls the onDataChange() callback and switches from its own internal background thread to the main thread, it also makes the Observable emit new items on the main thread, rendering useless any .subscribeOn() operator down the stream.

The actual question:

What can I do to not only correctly wrap listeners like this into an Observable but also make them conform to the Scheduler defined by .subscribeOn()?

Thank you!

Update:

I know .observeOn() gives me the ability to process the data returned by Firebase on another thread. That's what I'm doing already, but it just isn't the point of this question. The point is: when I pass a Scheduler through .subscribeOn() I expect the upstream to conform to that Scheduler's thread but that doesn't happen when the Observable has an internal listener that is being triggered from a callback on a different thread. When that happens, I lose the .subscribeOn() guarantee.

The severity of this issue may not seem obvious at first, but what if that Observable was part of a library? What's the best practice there? Should the library enforce its clients to always call an .observeOn() after any call to that method? Should the library call an .observeOn() itself and call it a "default Scheduler"? In any of these cases the .subscribeOn() is just useless, and that doesn't seem right to me.

Community
  • 1
  • 1
Renan Ferrari
  • 2,449
  • 5
  • 21
  • 26
  • 2
    Seems that only option is to call `observeOn` after the query to force change the downstream thread. – Maksim Ostrovidov Nov 28 '16 at 23:55
  • @maxost That's actually the current workaround I'm using. The problem is that it still ignores the Scheduler defined by a `.subscribeOn()` further down the stream. That would make the `queryObservable()` method above a bad API. I'm hoping for a proper solution to that. – Renan Ferrari Nov 29 '16 at 12:09
  • @RenanFerrari did you find a solution to this? Thnx – Jo Dar Gaya Woh Mar Gaya Apr 17 '17 at 18:34
  • @GabbarSingh Not really. I kept doing little work inside the callback and calling `observeOn` as soon as possible afterwards. – Renan Ferrari Apr 17 '17 at 22:11
  • Did you find any solution for this? – yoonhok Feb 11 '20 at 15:31
  • @yoonhok Never did, but I haven't looked into this for a long time. If you happen to find a good solution, please submit it :) – Renan Ferrari Feb 11 '20 at 19:28
  • Yesterday, I asked a similar question and I found the reason and solution, could you check it, please? https://stackoverflow.com/questions/60172733/why-does-thread-changed-to-main-thread-in-callback/60181082#60181082 – yoonhok Feb 12 '20 at 04:22

2 Answers2

0

Just use observeOn in IO and subscribeOn in Main Thread, in that way you can manage your recieved that in MainThread and move the firebase work to a different Thread.

Remember to import rxAndroid to your gradle(Rxjava or RxJava 2):

 compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

Also suggest you to check as reference(or just use it) one of the next libraries:

RxJava : https://github.com/nmoskalenko/RxFirebase

RxJava 2.0: https://github.com/FrangSierra/Rx2Firebase

One of them works with RxJava and the other one with the new RC of RxJava 2.0. If you are interested of it, you can see the differences between both here.

Francisco Durdin Garcia
  • 12,540
  • 9
  • 53
  • 95
-1

I Had the same problem, finally I combined with Coroutines to run the listener on background. To do that, simple add coroutine background work in your onDataChange

Kind regards