1

I have these two Observables in Kotlin where is just act as a timer and another one is HTTP network call response Observer.

timerDisposable = Observable.timer(daleyABCControlResetSeconds, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
        .doOnNext {
            if (getABCUpdate() != null) {
                Log.d("ABC", "Media status reset after 3 seconds: ")
                updateABCResponse(getABCUpdate())
            }
        }.subscribe()

disposable = audioApi.setABCUpdate(abcUpdate)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({
            timerDisposable.dispose()
            updateABCResponse(it)
            Log.d("ABC", "Media Status updated:")
        }, {
            Log.d("ABC", "Error updating Media Status: " + it.message)
            isABCControlChangeRequested = false
        })

I am not satisfied with this approach, can anyone please direct me right direction to use the rx's full potential. Thanks in advance.

EDIT

  Observable.combineLatest(Observable.timer(daleyABCControlResetSeconds, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
            .doOnNext {
                if (getABCUpdate() != null) {
                    Log.d("ABC", "Media status reset after 3 seconds: ")
                    updateABCResponse(getABCUpdate())
                }
            },

            audioApi.setABCUpdate(abcUpdate)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()),
            BiFunction<Long, ABCStatusUpdate, ABCStatusUpdate> { _, abcStatusUpdate ->
                abcStatusUpdate
            })
            .subscribe({
                timerDisposable.dispose()
                updateABCResponse(abcStatusUpdate)
                Log.d("ABC", "Media Status updated:")
            }, {
                Log.d("ABC", "Error updating Media Status: " + abcStatusUpdate.vol)
                isABCControlChangeRequested = false
            })
Ahmed S. Durrani
  • 1,535
  • 2
  • 17
  • 41
  • Can you tell what is `updateABCResponse(getABCUpdate())` method is doing – Rajat Beck Sep 27 '18 at 05:50
  • Its just sending data to another class using `PublishSubject` publish/listen – Ahmed S. Durrani Sep 27 '18 at 06:05
  • Could you tell more clearly what you try to do? Do you want to execute `http` request after specific `delay`? For example, wait some seconds -> execute request -> updateABCResponse? What in that case `audioApi.setABCUpdate()` perform? – ConstOrVar Sep 27 '18 at 06:36
  • @ConstOrVar basically I am trying to wait for few secs for `http` reply if received then other code doenst execute otherwise following code executes. – Ahmed S. Durrani Sep 30 '18 at 13:02

1 Answers1

1

You can use combinelatest, zip or merge for combinig. I think in your case combinelatest is suitable

Observable.combineLatest(
            Observable.timer(daleyABCControlResetSeconds, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
                .doOnNext {
                    if (getABCUpdate() != null) {
                        Log.d("ABC", "Media status reset after 3 seconds: ")
                        updateABCResponse(getABCUpdate())
                    }
                },
            audioApi.setABCUpdate(abcUpdate)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()),
            BiFunction<Long, YourApiResponseType, YourApiResponseType> { _, response ->
                response})
        .subscribe({
            timerDisposable.dispose()
            updateABCResponse(it)
            Log.d("ABC", "Media Status updated:")
        }, {
            Log.d("ABC", "Error updating Media Status: " + it.message)
            isABCControlChangeRequested = false
        })

UPD: You can change your code like this:

Observable.timer(5, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).startWith(-1L)
        .doOnNext {
            if (it == -1L) return@doOnNext
            //your condition
        }
Andrii Turkovskyi
  • 27,554
  • 16
  • 95
  • 105