5

What is the proper way to wait until all the observers on_completed are called if the observers are using observe_on(rxcpp::observe_on_new_thread()):

For example:

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        // ...
        s.on_completed();
    };
    auto values = rxcpp::observable<>::create<int>(generator).publish();
    auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
                    .subscribe([&](int) { slow_function(foo); }));

    auto lifetime = rxcpp::composite_subscription();
    lifetime.add([&](){ wrapper.log("unsubscribe");  });
    auto s2 = values.ref_count().as_blocking().subscribe(lifetime);

    // hope to call something here to wait for the completion of
    // s1's on_completed function
}

// the program usually crashes here when foo goes out of scope because 
// the slow_function(foo) is still working on foo.  I also noticed that
// s1's on_completed never got called.

My question is how to wait until s1's on_completed is finished without having to set and poll some variables.

The motivation of using observe_on() is because there are usually multiple observers on values, and I would like each observer to run concurrently. Perhaps there are different ways to achieve the same goal, I am open to all your suggestions.

Rook
  • 5,734
  • 3
  • 34
  • 43
LMC
  • 302
  • 3
  • 13

1 Answers1

4

Merging the two will allow a single blocking subscribe to wait for both to finish.

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        s.on_next(2);
        // ...
        s.on_completed();
    };

    auto values = rxcpp::observable<>::create<int>(generator).publish();

    auto work = values.
        observe_on(rxcpp::observe_on_new_thread()).
        tap([&](int c) {
            slow_function(foo);
        }).
        finally([](){printf("s1 completed\n");}).
        as_dynamic();

    auto start = values.
        ref_count().
        finally([](){printf("s2 completed\n");}).
        as_dynamic();

    // wait for all to finish
    rxcpp::observable<>::from(work, start).
        merge(rxcpp::observe_on_new_thread()).
        as_blocking().subscribe();
}

A few points.

the stream must return the same type for merge to work. if combining streams of different types, use combine_latest instead.

the order of the observables in observable<>::from() is important, the start stream has ref_count, so it must be called last so that the following merge will have subscribed to the work before starting the generator.

The merge has two threads calling it. This requires that a thread-safe coordination be used. rxcpp is pay-for-use. by default the operators assume that all the calls are from the same thread. any operator that gets calls from multiple threads needs to be given a thread-safe coordination which the operator uses to impose thread-safe state management and output calls.

If desired the same coordinator instance could be used for both.

Kirk Shoop
  • 1,274
  • 9
  • 13
  • Thanks for the suggestion. When I tried your solution, the compiler complains that "tap" is not a member of the connectabile_observable. Any suggestions? – LMC Sep 03 '15 at 14:30
  • 1
    yes, tap was added in this commit ([github](https://github.com/Reactive-Extensions/RxCpp/commit/9a16ae70998723a9e271ba6c8f3884fce0352202)) – Kirk Shoop Sep 03 '15 at 15:03
  • 1
    or change tap to map([&](int c) { slow_function(foo); return c; }). – Kirk Shoop Sep 03 '15 at 15:03
  • I downloaded your latest code and it works brilliantly. Thank you. Just curious, what's the difference between "tap" and "map". I tried both in the example code above and I got the same outcome. – LMC Sep 03 '15 at 16:00
  • 2
    tap is used to peek at the stream without modifying it - thus tap can be used to perform side-effects (like slow_function). map is used to change the value or type of the stream items. if slow_function returned a Bar for every call then the new Bar could be returned from map and the stream would become observable instead of observable – Kirk Shoop Sep 03 '15 at 16:19
  • 1
    Excellent. Thanks for the awesome rxcpp library. – LMC Sep 03 '15 at 16:48
  • quick question here, because I see the advice on combine_latest and wonder: merge merges two sequences. Combine latest returns a tuple, even if one of the sequences did not generate a value (except for the first time). My question is: in order to have an heterogeneous merge, should that return a variant (as opposed to a tuple)? – Germán Diago Jan 03 '17 at 10:18
  • The default for combine latest and with latest from is to make both values available whenever either changes. To produce the variant, map each source to a variant and then merge them. The former must keep a copy of each variable, while the later does not. – Kirk Shoop Jan 03 '17 at 14:54