83

I have two Completable. I would like to do following scenario: If first Completable gets to onComplete , continue with second Completable. The final results would be onComplete of second Completable.

This is how I do it when I have Single getUserIdAlreadySavedInDevice() and Completable login():

@Override
public Completable loginUserThatIsAlreadySavedInDevice(String password) {
    return getUserIdAlreadySavedInDevice()
            .flatMapCompletable(s -> login(password, s))

}
Maksim Ostrovidov
  • 10,720
  • 8
  • 42
  • 57
Mladen Rakonjac
  • 9,562
  • 7
  • 42
  • 55

5 Answers5

140

You are looking for andThen operator.

Returns a Completable that first runs this Completable and then the other completable.

firstCompletable
    .andThen(secondCompletable)

In general, this operator is a "replacement" for a flatMap on Completable:

Completable       andThen(CompletableSource next)
<T> Maybe<T>      andThen(MaybeSource<T> next)
<T> Observable<T> andThen(ObservableSource<T> next)
<T> Flowable<T>   andThen(Publisher<T> next)
<T> Single<T>     andThen(SingleSource<T> next)
Maksim Ostrovidov
  • 10,720
  • 8
  • 42
  • 57
  • 3
    This should be marked as correct answer. I encountered the same situation as the op and been searching answers for 2 days. Your answer @Maxim Ostrovidov is all I'm looking for. Thank you! – chip Jun 18 '17 at 08:16
  • Also, I have tried with doOnComplete() , DO NOT do it, it terminates stream – Mladen Rakonjac Jul 20 '17 at 08:46
  • 2
    The documentation doesn't indicate if the second completable is executed or not in the case of an error in the first one. Does anyone know? – BoD Oct 12 '17 at 08:00
  • 2
    @BoD If any error occurs in a stream - it gets terminated with a `onError` event. That's the general behavior of reactive streams. So, answering your question, second `Completable` won't be executed. – Maksim Ostrovidov Oct 13 '17 at 23:16
  • 11
    Hmmm, I don't think `andThen` is a replacement of `flatMapCompletable` because `andThen` takes an instance of Completable whereas the `flatMapCompletable` takes a function that creates a Completable. The former is eager, when you invoke it the Completable is created whereas in the latter Completable is constructed lazily, after the Completable completes. With the eager version, you could be calling constructing the Completable before the former has completed, couldn't you? – Galder Zamarreño Feb 07 '18 at 23:41
  • `andThen()` is a synonym for `concatWith()`, so the same error behavior applies (i.e., fail the entire stream as soon as `onError()` is triggered). – E.M. May 15 '18 at 01:05
  • 1
    Zamanneno is right. With this you have to wrap anything eager with Single.fromCallable(() -> eagerCompletable).flatMapCompletable(c -> c). This has just bitten me. – Sparky Aug 15 '18 at 19:13
  • 1
    Actually there's a Single/Completable/Observable defer() static function you can use to produce flatmap-like lazy functionality – Sparky Aug 15 '18 at 20:13
134

TL;DR: the other answers miss a subtlety. Use doThingA().andThen(doThingB()) if you want the equivalent of concat, use doThingA().andThen(Completable.defer(() -> doThingB()) if you want the equivalent of flatMap.


EDIT: a more complete reference

  • flatMap() is the mapping version of merge()
  • concatMap() is the mapping version of concat()
  • For a Completable you need defer() to make function calls lazy like in the mapping functions for Single or Observable (or preferably make it so that nothing happens until you hit subscribe - this is a good convention to follow and is used in the official Rx libraries as well as any Rx extensions I've encountered, for advanced users this refers to cold completables only but most people can ignore that).
  • the only difference between concat(a, b) and a.andThen(b) is syntax

Some examples:

  • foo(a).andThen(bar(b)) will:

    1. call foo(a)
    2. immediately call bar(b) even if the completable returned by step 1 returns an error
    3. subscribe to whatever step 1 returns
    4. Will then subscribe to the result of bar(b) only if the last step completed successfully
  • foo(a).andThen(Completable.defer(() -> bar(b)) will:

    1. call foo(a)
    2. subscribe to the result of step 1
    3. only if the completable returned by by foo(a) succeeds then calls bar(b)

I'm going to leave out the treatment of merge() since it gets a bit more complicated, but long story short that's the one to call if you want "parallelism".


The above answers are sort of correct, but I found them misleading because they miss a subtlety about eager evaluation.

doThingA().andThen(doThingB()) will call doThingB() immediately but only subscribe to the observable returned by doThingB() when the observable returned by doThingA() completes.

doThingA().andThen(Completable.defer(() -> doThingB()) will call doThingB() only after thing A has completed.

This is important only if doThingB() has side effects before a subscribe event. E.g. Single.just(sideEffect(1)).toCompletable()

An implementation that doesn't have side effects before the subscribe event (a true cold observable) might be Single.just(1).doOnSuccess(i -> sideEffect(i)).toCompletable().

In the case that's just bitten me thing A is some validation logic and doThingB() kicks off an asynchronous database update immediately that completes a VertX ObservableFuture. This is bad. Arguably doThingB() should be written to only update the database upon subscribe, and I'm going to try and design things that way in the future.

Sparky
  • 2,694
  • 3
  • 21
  • 31
  • or just wrapped method doThingB() by Observable.create() or Observable.fromCallable(). – Qing Dec 06 '18 at 18:45
  • @Qing if doThingB() returns an observable or completable those are both problematic. Create is a lower level API so you'll end up re-implementing defer(). Observable.fromCallable() will cause serious bugs: it'll run the callable then the result (the inner completable) will be ignored (unless you add a flatmap, but that's the same behavious as defer but in two calls), so it won't wait for the operation to finish and any errors will be lost. (edit: more constructive reply). – Sparky Dec 06 '18 at 19:57
  • @Sparky hmm its that so , do you have a sample code to try, never encounter that, i thought andThen will wait until the inside observable to done running – Qing Dec 06 '18 at 22:41
  • It does; but Completable.complete().andThen(Observable.fromCallable(() -> Completable.fromAction(() -> System.out.println("boo"))).subscribe() won't print anything because the inner completable is never subscribed. – Sparky Dec 06 '18 at 23:07
  • Here is an example of a bad doThingB(): Completable doThingB() { System.out.println("first half of work"); return Completable.fromAction(() -> System.out.println("second half of work"); }. The developer thinks it prints two lines on success or no lines on failure because that is how flatMap would work. But Completable.complete().andThen(Observable.fromCallable(() -> doThingB())).subscribe() and Completable.error( new RuntimeException()).andThen(doThingB()).subscribe() will print "first half of work". To reproduce the behaviour of flatmap you need to use Completable.defer(() -> doThingB()). – Sparky Dec 06 '18 at 23:53
  • But I do agree that it would be better to refactor doThingB() if that is what you mean. – Sparky Dec 07 '18 at 00:26
  • Just to confirm, if I use `.andThen(Completable.defer(()->doThingB())`, `doThingB()` won't be called if `doThingA()` failed and called `onError()`, right? – Jack Jan 08 '19 at 23:38
  • Pretty sure: ` Completable.fromAction(() -> System.out.println("foo")) .andThen(Completable.error(new RuntimeException("bam!"))) .andThen(Completable.fromAction(() -> System.out.println("Hello World"))).await();` – Sparky Jan 18 '19 at 14:37
  • `defer()` is the second order equivalent of `fromAction()` the same way that `flatMap()` is the second order equivalent of `map()`, so they're equivalent here. – Sparky Jan 18 '19 at 14:41
  • I wonder why there is no such method for `Completable` (like `flatMap()` for `Single` and `Observable`) which would explicitly convey our intention to have `doThingB()` executed only after `doThingA()` was completed. Using `.defer()` complicates the code and may be forgotten and lead to subtle bugs. – Varvara Kalinina Mar 14 '19 at 17:26
  • I think it's basically because in "pure" reactive all observables and completables are "meant" to be lazy anyway, and if that's true then there's no reason to need such a function. By fully lazy I mean nothing happens until "subscribe()" is called, and if "subscribe()" is called again then the operation is repeated. If you look elsewhere in the framework there are many other places that don't work unless the observable is fully lazy (EG 'retry()'). Using quotes because in practice hardly anyone seems to actually do this, and it's not exactly made obvious that we should do this. – Sparky Mar 14 '19 at 17:36
  • Encountered this today where `doThingB()` was called immediately and caused a bug. Ended up using defer and reading this answer just gave me reassurance. Very well explained. Thank you! – Saurabh Shrivastava Jan 04 '20 at 09:22
  • Is this a correct statement `Use doThingA().andThen(doThingB()) if you want the equivalent of concat`. from what I understand `concat` is doing in sequential and `flatmap` doesn't ..? – Jongz Puangput Jan 31 '20 at 04:49
  • @JongzPuangput deleted my original because it was wrong. I've updated my answer. In short: yes, `andThen()` is identical to `concat()` but with different syntax. There's no real equivalent of `flatMap()` or `concatMap()` for completable, but `defer()` replicates the lazy behaviour that a lot of people use `flatMap()` for. – Sparky Jan 31 '20 at 09:48
  • @Sparky, this is wrong: "immediately call bar(b) even if the completable returned by step 1 returns an error" Try this one: `Completable.error(new RuntimeException("Boom")).andThen((CompletableObserver o) -> System.out.println("Never happens")).subscribe();` – igobivo Jul 06 '21 at 17:10
  • @igobivo nope, assuming you're thinking of the example `foo(a).andThen(bar(b))` you've misread it, it's standard function precedence not anything reactive. You're thinking of what would happen if it was `foo(a).andThen((CompletableObserver o) -> bar(b))`. The issue comes when people make a beginner mistake and define `bar(b)` as something like `Completable bar(String b) { if(someDatabaseOperation(b)) { return Completable.complete(); } else { return Completable.error(); } }`. I've seen this at least twice in the wild. – Sparky Jul 07 '21 at 11:15
  • Hi @Sparky, I tried Completable.defer { doSecondCompletable() } but doSecondCompletable is not getting called. Did i miss something? – etomun Jul 18 '21 at 09:11
  • @etomun the only thing I can think of is to ask if you've checked that the result is being subscribed to? ``` Completable c = Completable.complete(); System.out.println("Point A"); Completable e = Completable.defer(() -> { System.out.println("Point B"); return Completable.fromAction(() -> System.out.println("Point C")); }); System.out.println("Point D"); c.andThen(e).subscribe(); System.out.println("Point E"); ``` prints A->D->B->C->E for me. – Sparky Jul 19 '21 at 10:23
5

Try

Completable.concat

Returns a Completable which completes only when all sources complete, one after another.

http://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#concat(java.lang.Iterable)

Deni Erdyneev
  • 1,779
  • 18
  • 26
  • Concat will lead to the same problem as described above. It will be resolved immediately. So if you really depend on your call before you should stick to doThingA().andThen(Completable.defer(() -> doThingB()). – Highriser Feb 20 '19 at 06:23
5

Observe that the answer that has more votes here is a little bit misleading. See the example below, my idea is to show some testing scenarios and show how the completable logic with the operator andThen behaves.

 private fun doThingAFail(): Completable {
        print("Do thingA Called\n")
        return Completable.fromCallable {
            print("calling stream A\n")
            throw(Exception("The excep"))
        }
    }

    private fun doThingB(): Completable {
        print("Do thingB Called\n")
        return Completable.fromCallable {
            print("calling stream B\n")

        }
    }

    private fun doThingA(): Completable {
        print("Do thingA Called\n")
        return Completable.fromCallable {
            print("calling stream A\n")
        }
    }

Observe that for the test below:

@Test
fun testCallAPlusB() {
    doThingA().andThen(doThingB())
}

the output would be:

Do thingA Called
Do thingB Called

Quick note here: Observe that we are not subscribing to these Completables in this snippet.

For the test:

@Test
fun theTestSubscribe() {
    doThingA().andThen(doThingB()).subscribe()
}

The output would be:

Do thingA Called
Do thingB Called
calling stream A
calling stream B

And lastly, in case first completable fails, the second completable would not be executed.

@Test
fun theTestFailThingA() {
    doThingAFail().andThen(doThingB()).subscribe()
}

the output would be:

Do thingA Called
Do thingB Called
calling stream A

The key concept here is that the logic inside the method and inside the observable are executed not at the same time. "Do thingA Called" and "Do thingB Called" lines will be printed once we call doThingA() and doThingB() methods. While the "calling stream A" and "calling stream B" lines will only be called when someone subscribes to doThingA and doThingB methods.

The second concept here is how the andThen operator will handle errors. In the example above, in case doThingA() completable ends up with an error, the stream will end and not print the "calling stream B" line.

0

I had the same problem and I had use the operator .concactWith to make it work. In my case, I have two fun of type Completable.

fun makeTwoThings(): Completable {
    makeFirstThing().concatWith(makeSecondThing())
}

fun makeFirstThing(): Completable{
     //TODO()
}

fun makeSecondThing(): Completable{
     //TODO()
}