1

I am a bit confused about the flow of code, in this case. Please refer the code below.

import { fromEvent } from 'rxjs';
import { mapTo } from 'rxjs/operators';

let btn = document.getElementById("btnclick");
let btn_clicks = fromEvent(btn, 'click');
let positions = btn_clicks.pipe(mapTo ("Testing MapTo"));
positions.subscribe(x => console.log(x));

It outputs Testing MapTo when i click the button. In the beginning i thought that whenever button clicks are happening , the .pipe() method is called and then the .subscribe() method is called causing it to output in the console. However i read that,

subscribe is for activating the observable and listening for emitted values

in the link.

Questions:

  1. Since the .subscribe() is called after the .pipe() , how come it is working ? I mean only after .subscribe() it gets the value, then how does it goes through .pipe() ? Please help me understand this.
  2. What happens when we do a http call like this ( say the returned obeservable is x ):
this.http.get('url')
    .pipe(
      filter(x => x % 2 === 0),
      map(x => x + x)
    )
    .subscribe(x => {
        console.log(x);
    }
KL_KISNE_DEKHA_HAI
  • 649
  • 11
  • 26

1 Answers1

7

The flow is made clear when you understand the meaning of pipe. It does what it says, it pipes in something in between the value emitted by the source and the subscription.

source --->     modify results by       ---> subscription
               piping in operators

Question 1

click  --->  map (convert) click event     --->   console.log(x)
event        to string "Testing MapTo"

Question 2

   Http     --->  go forward only if   --->   map (convert)     ---> console.log(x)
observable        number is even            result = 2 * result

Update (Cold Observable)

When a cold observable is subscribed to (eg. positions.subscribe(x...), the subscribe method of the Observable class is called. After that the callback functions go into hiatus mode and wait for the observable to emit. This is key here. The subscription triggers the sequence of statements in an observable to action, but it does not know when the observable will emit. This is why the observable is called push method of obtaining a value. The observable pushes the output when it is ready.

Once the subscribe function in the observable class is triggered, the pipe function comes into play. Because remember the observable isn't directly subscribed to. It is someObs.pipe(...).subscribe(). It is akin to the statement someData.getSum().getAverage(). The result of getSum() function applied to someData would be the input for the getAverage() function. In the same way, the operators in the pipe will be applied to the source observable which will return a new modified observable. And the subscription waits over this observable.

Informal example

Ordering a pizza scenario

+----------------------------------+---------------------------------------+
|          Ordering pizza          |       Subscribing to observable       |
+----------------------------------+---------------------------------------+
| Place order                      | Subscribe to observable               |
| Kitchen worker starts processing | The observable starts processing      |
| No onions                        | Filter some values                    |
| Double chesse                    | Map the values to return (2 * values) |
| Some other toppings              | Some other operators                  |
| Receive the pizza                | Receive the value                     |
+----------------------------------+---------------------------------------+

Two important things to note here:

  1. You order the pizza and wait. You do not know when you'll receive the pizza. The same way the subscription does not know when it'll receive the data. That's what makes it asynchronous (fortunately the RxJS doesn't discriminate based on anything). All subscriptions will wait regardless of their properties.

  2. Now it is easy to mistake that the subscription doesn't have anything to do with the observable. That it's just a watcher waiting to receive the data while the observable has already started processing it's statements. That would be wrong. In that case the subscription callbacks should receive the data as soon you subscribe. But in reality the observable doesn't start until it is subscribed to.

ruth
  • 29,535
  • 4
  • 30
  • 57
  • Hello @Michael D, just wanted to clarify 2 more things: whenever we call subscribe the subscriber function inside observable is called right? So, the first time the value appears would be in ' x ' inside the ... subscribe(x =>) ... function. So when does pipe comes into the picture, like does it go bottom-up kind of? – KL_KISNE_DEKHA_HAI Jun 06 '20 at 17:28
  • And what is Subscription here as you mentioned in your answer? – KL_KISNE_DEKHA_HAI Jun 06 '20 at 17:28
  • @KL_KISNE_DEKHA_HAI: In the data flow I mentioned, the `console.log(x)` functions would be the subscription callback functions. For the other two questions, I've updated the answer. Hopefully it helps you understand better. – ruth Jun 06 '20 at 21:49
  • Hey @Michael D, thanks man. It did help a lot. Can you upvote my question also :). So that it reaches more number of people. – KL_KISNE_DEKHA_HAI Jun 07 '20 at 06:50
  • Hello @Michael D, can you help me with this question : https://stackoverflow.com/questions/62394625/unit-testing-asynchronous-code-including-setinterval-in-angular – KL_KISNE_DEKHA_HAI Jun 15 '20 at 18:29