0

I'm trying to extract some common logic, based on RxJava2, into reusable components. Let's imagine I have the following piece of code:

someSingle
    .doOnSuccess { // update UI based on side effect }
    .subscribeOn(...)
    .observeOn(...)
    .subscribe(
        value -> // update UI based on value
        throwable -> // handle error
    )

I want to wrap this into a reusable component, exposing a method that returns a Flowable of events. The clients will receive events and update the UI accordingly. My goal is not to have any reference of the view inside the reusable component. I want the method to be something like this:

fun reusableMethod(...) : Flowable<Event> { ... }

Event is a sealed class, enclosing two sub types - SideEffectEvent and ValueEvent.

What is the best way to transform the stream from the first snippet, so I can get both the side effect and the value to be emitted as flowable values?

Currently, I have the following solution, but I'm not very happy with it, because it looks a bit clunky and complex:

private val sideEffectEvents = PublishProcessor.create<SideEffectEvent>()

fun reusableMethod(...) = 
    Flowable.merge(
        someSingle.doOnSuccess { sideEffectEvents.onNext(SideEffectEvent()) },
        sideEffectEvents
    )
    .subscribeOn(...)
    .observeOn(...)

I have also considered some alternatives:

  • Notify the client for SideEffectEvents using a callback that is passed to someReusableMethod() - looks very unnatural and having a callback and a stream to subscribe to is not a good code style
  • Use a single PublishProcessor. Post side effects to it and use it to subscribe to the original Single. Expose a cleanUp() method in the reusable component so the client can dispose of the stream when it decides to.

I'm looking forward to suggestions and ideas.

Danail Alexiev
  • 7,624
  • 3
  • 20
  • 28

2 Answers2

0

First of all it doesn't have to be a Flowable. It can be a simple Observable. But the below solution should work in both cases. read more here Observable vs Flowable

This code is not tested, I have written it to give you a simplified idea about how you can achieve this.

// a sealed class representing current state 
sealed class ViewState {
    object Loading : ViewState() // using object because we do not need any data in cass of loading
    data class Success(val data: List<Model>) : ViewState()
    data class Error(val t: Throwable) : ViewState()
}

// an observalbe or flowable returning a single object ViewState
// it will always return ViewState class containing either data or error or loading state
return service.getData()
    .map { data -> ViewState.Success(data) } // on successful data fetch
    .startWith(ViewState.Loading()) // show loading on start of fetch
    .onErrorReturn { exception -> ViewState.Error(exception) } // return error state 
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())

// somewhere in Activity or in multiple activities subscribe to above observable
subscribe({ viewState ->
     when {
         viewState.Loading -> showProgressView()
         viewState.Error -> showErrorView(viewState.t)
         viewState.Success -> showData(viewState.data)
         else -> IllegalArgumentException("Invalid Response")
     }

})
Atiq
  • 14,435
  • 6
  • 54
  • 69
  • It has to be a Flowable to fit into the app architecture. And your example is working flawlessly, but I have multiple streams chained with `flatMap()` and I need to propagate events from previous streams to the subscriber as well. – Danail Alexiev Jan 14 '20 at 14:14
  • Now, this situation is too broad for me to provide an answer. If you can update the question according and provide a clear context to the problem then maybe I or someone else will be able to help. – Atiq Jan 14 '20 at 14:26
0

How about this:

Before:

someSingle
    .operation1()
    .operation2()
    .doOnSuccess { // update UI based on side effect }
    .operation3()
    .operation4()
    .subscribeOn(...)
    .observeOn(...)
    .subscribe(
        value -> // update UI based on value
        throwable -> // handle error
    )

Reusable:

fun reusableMethod(...): Flowable<Event> = 
    someSingle
        .operation1()
        .operation2()
        .flatMapPublisher {
            Single.concat(
                Single.just(getSideEffectEvent(it)),
                Single.just(it)
                    .operation3()
                    .operation4()
                    .map { value -> getValueEvent(value) }
            )
        }
        .subscribeOn(...)
        .observeOn(...)

You can further simplify this using Flowable#startWith, and avoiding Single#concat()

Sanlok Lee
  • 3,404
  • 2
  • 15
  • 25