I am trying to learn how to create custom operators in rxcpp, and I was able to create operators as sited in here. But, I would like to learn how to create more generic operators implementing rxo::operator_base and using lift operator. Is there any documentation available to learn this with some simple examples?.
Asked
Active
Viewed 771 times
2
-
I looked myself a while back and could not find any. I proceeded by doing similar to the linked example, where I wrapped/composed existing operators to create my own, in addition to creating overloads of `operator |`. I was shocked and disappointed at the complexity of the existing rxcpp operators after looking at the source. Personally I'm hoping the next version drops support for C++11 which according to Kirk Shoop should make implementing many aspects of rxcpp, including operators, much simpler. – Rotsiser Mho May 13 '18 at 23:44
-
1@RotsiserMho I tried to explore on this topic, but it is observed that the custom operators using `rx::operator_base` requires modification in the library code. In future releases of RxCpp it would have been helpful for us, if they introduce interface oriented approach as in RxJava and Rx.Net to customize the operators rather than only sticking to this composition of standard operators. – Peter Abraham May 16 '18 at 11:31
2 Answers
1
Here`s a way to use rxcpp v2 observerable lift function :
class MyTestOp //: public rxcpp::operators::operator_base<int>
{
public:
MyTestOp(){}
~MyTestOp(){}
rxcpp::subscriber<int> operator() (rxcpp::subscriber<int> s) const {
return rxcpp::make_subscriber<int>([s](const int & next) {
s.on_next(std::move(next + 1));
}, [&s](const std::exception_ptr & e) {
s.on_error(e);
}, [&s]() {
s.on_completed();
});
}
};
int main()
{
auto keys = rxcpp::observable<>::create<int>(
[](rxcpp::subscriber<int> dest){
for (;;) {
int key = std::cin.get();
dest.on_next(key);
}
}).
publish();
keys.lift<int>(MyTestOp()).subscribe([](int key){
std::cout << key << std::endl;
});
// same as use class
//keys.lift<int>([](rxcpp::subscriber<int> s) {
// return rxcpp::make_subscriber<int>([s](const int & next) {
// s.on_next(std::move(next + 1));
// }, [&s](const std::exception_ptr & e) {
// s.on_error(e);
// }, [&s]() {
// s.on_completed();
// });
//}).subscribe([](int key){
// std::cout << key << std::endl;
//});
// run the loop in create
keys.connect();
return 0;
}
since it is base on template check, you don`t need to inherit from any interface, just implement an operator() like before will be ok.
And I think the author would prefer you to use the way in the comments.
And maybe I should use has subscribe check ... any way ...
if(!s.isUnsubscribed()) { /*call s.on_xxx*/ }

Wang Evander
- 11
- 1
0
I found the following slide from Kirk's 2016 presentation quite helpful, even though it is about rxcppv3 rather than v2.
Sequence concepts
struct observable {
void bind(observer);
};
struct observer {
template<class T>
void next(T);
};
struct lifter {
observer lift(observer);
};
Sequence Implementations
const auto ints = [](auto first, auto last){
return make_observable([=](auto r){ // Define observable::bind
for(auto i = first;; ++i){
r.next(i);
if (i == last) break;
}
});
};
const auto copy_if = [](auto pred){
return make_lifter([=](auto r){
return make_observer(r, [=](auto& r, auto v){ // Define observer::next
if (pred(v)) r.next(v);
});
});
};

Tom Huntington
- 2,260
- 10
- 20