Brief introduction for those not familiar with Android and/or Firebase development:
In Android development, you should always manipulate your application's views from the main thread (also called UI thread), but if your application needs to make some heavy processing, it should use a background thread, otherwise the app would seem unresponsive.
Firebase is a service that offers a way to store and sync data with a NoSQL database in the cloud. It also offers an Android SDK to manage this database. Every time this SDK is used to make an operation, like a query, Firebase avoids those threading pitfalls by making all of its heavy processing on its own internal background thread and by always calling its callbacks on the main thread.
Example:
Query postsQuery = FirebaseDatabase.getInstance().getReference("posts");
ValueEventListener postListener = new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
// This is always called on the main thread
// Get Post object and use the values to update the UI
Post post = dataSnapshot.getValue(Post.class);
// ...
}
@Override
public void onCancelled(DatabaseError databaseError) {
// Getting Post failed, log a message
printError(databaseError.toException());
// ...
}
};
postsQuery.addValueEventListener(postListener);
The actual issue I'm facing:
I'm trying to wrap Firebase's query listeners with RxJava using a method like this:
private static Observable<DataSnapshot> queryObservable(final Query query) {
return Observable.fromEmitter(emitter -> {
// This is called on the Scheduler's thread defined with .subscribeOn()
printThread("emitter");
final ValueEventListener listener = new ValueEventListener() {
@Override public void onDataChange(final DataSnapshot dataSnapshot) {
// This is always called on the main thread
printThread("onDataChange");
emitter.onNext(dataSnapshot);
}
@Override public void onCancelled(final DatabaseError databaseError) {
// This is called on the main thread too
emitter.onError(databaseError.toException());
}
};
query.addValueEventListener(listener);
emitter.setCancellation(() -> query.removeEventListener(listener));
}, Emitter.BackpressureMode.BUFFER);
}
But because the Observable is emitting items from inside the Firebase's callback (called on the main thread) any further .subscribeOn()
operators are going to be ignored.
For example, calling the above method like this:
Query postsQuery = FirebaseDatabase.getInstance().getReference("posts");
queryObservable(postsQuery).doOnSubscribe(() -> printThread("onSubscribe"))
.subscribeOn(Schedulers.io())
.subscribe(dataSnapshot -> printThread("onNext"));
Would print the following:
onSubscribe Thread: RxIoScheduler-2
emitter Thread: RxIoScheduler-2
onDataChange Thread: main
onNext Thread: main
From what I understand, when Firebase's SDK calls the onDataChange()
callback and switches from its own internal background thread to the main thread, it also makes the Observable emit new items on the main thread, rendering useless any .subscribeOn()
operator down the stream.
The actual question:
What can I do to not only correctly wrap listeners like this into an Observable but also make them conform to the Scheduler defined by .subscribeOn()
?
Thank you!
Update:
I know .observeOn()
gives me the ability to process the data returned by Firebase on another thread. That's what I'm doing already, but it just isn't the point of this question. The point is: when I pass a Scheduler through .subscribeOn()
I expect the upstream to conform to that Scheduler's thread but that doesn't happen when the Observable has an internal listener that is being triggered from a callback on a different thread. When that happens, I lose the .subscribeOn()
guarantee.
The severity of this issue may not seem obvious at first, but what if that Observable was part of a library? What's the best practice there? Should the library enforce its clients to always call an .observeOn()
after any call to that method? Should the library call an .observeOn()
itself and call it a "default Scheduler"? In any of these cases the .subscribeOn()
is just useless, and that doesn't seem right to me.