2

I have a Flowable<T> and errors that are subtype of IgnoreThisError I want to ignore (resulting in graceful completion), all other errors should be propagated downstream.

Kotlin example:

val f : Flowable<T> = ...
val g = f.onErrorComplete { it is IgnoreThisError }

This function onErrorComplete is what I need and does not exist...

user3612643
  • 5,096
  • 7
  • 34
  • 55

2 Answers2

1

Something like this should work

f.onErrorResumeNext {
    if (it is IgnoreThisError)
        Flowable.empty()
    else
        Flowable.error(it)
}

I am not sure if Kotlin can infer types properly here due to overloading and Java's emulation of variance in signatures, if not you may need to be more explicit:

f.onErrorResumeNext(Function<Throwable, Publisher<T>> {
    if (it is IgnoreThisError)
        Flowable.empty()
    else
        Flowable.error(it)
})
Alexey Romanov
  • 167,066
  • 35
  • 309
  • 487
1

Here is the solution for the missing onErrorComplete as an extension function:

/**
 * Errors encountered in the stream for which the provided `predicate` returns true will be silently turned into graceful completion.
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
inline fun <T> Flowable<T>.onErrorComplete(crossinline predicate: (Throwable) -> Boolean): Flowable<T> =
  onErrorResumeNext { error: Throwable ->
    if (predicate(error)) Flowable.empty<T>() else Flowable.error<T>(error)
  }

Note that I had to put an explicit error: Throwable ->, otherwise the compiler would complain because of the overloads of onErrorResumeNext.

user3612643
  • 5,096
  • 7
  • 34
  • 55