0

I'm an absolute rxjs beginner. For me to start learning to think in observables, I need to translate concepts through code examples. I feel like if I can see the code for this, I can start to do this on my own with other concepts.

I do NOT want to CONVERT a promise to an observable, I want to make a new implementation using Observable that can behave like a promise. How would I re-write the following using Observables?

 constructor(){
    let makeMessage2 = function(){
        return new Promise(resolve, reject){
           setTimeout(()=>{
                  var r = Math.random();
                  resolve("message two plus random value: " + r );
           }, 1000);
        }
    }
    this.logMessageAndResultOfCallback("message one!", makeMessage2);
}
private sideEffect1:string = "";
private sideEffect2:string = "";

logMessageAndResultOfCallback( message1:string, callback:Function ){
    console.log(message1);
    this.sideEffect1 = message1;

    callback().then((message2)=>{
          console.log(message2);
          this.sideEffect2 = message2;
    }
}

I guess the part I don't understand is how to define the "callback" function, how to invoke it. I understand I would wait for the complete or emit handlers, like makeMessage2().subscribe(message2 => console.log(message2)); but I don't have any clue how to define makeMessage2.

This may be totally clueless question, but I've read about 10 different intros to rxjs and this hasn't quite clicked. I just need to map this scenario to observable pattern and I think I can understand it.

Basically, I want to define an observable function myObs() that does not "execute immediately" but "executes" whenever the someMethod(message:string,obs:Observable) is executed. When myObs is executed, it should do something ansynchronously within it (like get the result of a HTTP request) then set the next value, then fire a complete() so my observer defined in someMethod can handle the complete and do something with the result.

Edit: I'm not concerned with the timer or native equivalents in rxjs, this is just to simulate any async action, like getting data from the server.

FlavorScape
  • 13,301
  • 12
  • 75
  • 117
  • `Observable.create` takes subscriber function -- that executes to whoever subscribes. Dose this not work for you? Your question is not really clear. On rxjs docs site there is a good wizard on how to start observable from different things – aarosil Mar 30 '17 at 23:23
  • "promise callback pattern" also very confusion.. usually those are 2 different approach for async code – aarosil Mar 30 '17 at 23:24
  • So, I ended up doing my own attempt. Does this seem correct? – FlavorScape Mar 30 '17 at 23:39
  • Exactly, it's potentially two async behaviors. In my use case, it is dismissing a UI item after the "callback" is done computing, but does not call the callback until user action. – FlavorScape Mar 30 '17 at 23:45
  • Yeah that is what I meant. Just fyi though for this example thing though you don't need setTimeout at all, there is build in rx operator `delayTime` – aarosil Mar 31 '17 at 00:51
  • that's just to simulate some async action, like getting something from the server or etc. I'm not actually concerned with timers. – FlavorScape Mar 31 '17 at 16:04

2 Answers2

0

The code that you wrote and want to 'translate' to observables probably would not work. callback is a promise, not a function, so you can't write callback().

Did you try this introduction too? It worked for many people.

To answer your question, you could write

Rx.Observable.of(""message one!", "message two!")
  .map(console.log.bind(console)) // would be better to use `do` operator actually, for semantic reasons, but that works the same here
  .subscribe(noop, noop, noop)

or

Rx.Observable.of(""message one!", "message two!")
  .subscribe(console.log.bind(console), noop, noop)

where noop is a function that does not do anything, i.e. function noop(){}

In short, your stream emits data, that data flow through a series of operators, and the data flow is started by .subscribe. In your case, you have nothing interesting to do on subscription, because all you do is logging.

Rxjs Streams are in fact callback-based under the hood. You want to check this answer to understand it.

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
  • Oops, meant to make a function that returns a promise. Edited code. I don't think this is what I'm looking for. The intended use case is a function that has some initial side effect (logs message1), and requires a "callback" to be invoked with an asynchronous resolution. The resolution of that callback needs to invoke behaviors/side effects that are agnostic to the callback functions internals. I'm not simply trying to print two messages. – FlavorScape Mar 30 '17 at 23:03
  • No, I need side effects. your example can't do this. I just made my example have concrete side effects for illustration. – FlavorScape Mar 30 '17 at 23:16
  • I added my attempt. does this seem to match the pattern I propose? – FlavorScape Mar 30 '17 at 23:47
0

I solved it with help from this guide.

import {Observable} from 'rxjs';
var makeMessage2 = Observable.create(observer => {
  // Yield a single value and complete
  setTimeout(function(){
     let r = Math.random();
     observer.next("message two plus random value: " + r );
     observer.complete();
  }, 1000);
  return () => console.log('disposed')
});
logMessageAndResultOfCallback( "some message one", makeMessage2);


logMessageAndResultOfCallback( message1:string, callback:Observeable ){
    console.log(message1);
    this.sideEffect1 = message1;

    var subscription = callback.subscribe(
       (value)=>{this.sideEffect2 = value;},
       (e) =>{ console.log('onError: %s', e)},
       () => {console.log(this.sideEffect2);});
      subscription.dispose();
}
FlavorScape
  • 13,301
  • 12
  • 75
  • 117