0

Edited the question with a concise example. I think it'll make it easier for people to understand. Take note this example is super simplified, normally I layer things between different components, but for the question it will suffice.

Take this component. It takes the name of a fetched object, and a button to fetch the next object. To get the value of the next REST request, I know of no other way than to subscribe to the answer, and what I would like is to have something like "combineLatest" but for "the future", so I can combineLatest of later streams.

import { Component, VERSION, OnInit } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  
    private readonly PEOPLE_API_ENDPOINT = `https://swapi.dev/api/people/`;

   private characterSubject : BehaviorSubject<any> = new BehaviorSubject<any>({name: 'loading'});
  private currentCharacter: number = 1;
  
  character$ : Observable<any> = this.characterSubject.asObservable();
  
  constructor(
    private http: HttpClient
  ) {}
    
  ngOnInit() {
    this.updateCurrentCharacter();
  }

  nextCharacter() : void {
    this.currentCharacter ++;
    this.updateCurrentCharacter();
  }

  //I would want to avoid subscribing, instead
  //I would like some sort of operation to send the stream
  //emissions to the subject. As to not break the observable
  //chain up until presentation, like the best practices say.
  private updateCurrentCharacter() : void {
    this.fetchCharacter(this.currentCharacter)
      .subscribe( 
        character => this.characterSubject.next(character)
      );
  }

  private fetchCharacter (id: number) : Observable<any> {
        return this.http.get(this.PEOPLE_API_ENDPOINT + `${id}/`);
  }
}
<span>{{ character$ | async }} </span>

<button (click)="nextCharacter()">Next character</button>

Online demo

Is there any way to do that? Doing something like "emitIn(characterSubject)". I think there is nothing like this, like dynamically add source emissions to a source.

JoshiRaez
  • 97
  • 9
  • hm I don't think I get it - should the view get data from `currentData` or `dataRepository`? – ggradnig Aug 11 '20 at 10:41
  • If you want to delay the actual subscription time you could use `publish` and `connect`. Is this something your looking for? – ggradnig Aug 11 '20 at 10:44
  • 1
    If you run into this problem often and don't want to go all in with a state management library like NgRx you might want to look into state management with pure RxJS. [Video](https://youtu.be/h-F5uYM69a4) - [Article](https://medium.com/angular-in-depth/angular-you-may-not-need-ngrx-e80546cc56ee) – frido Aug 11 '20 at 10:57
  • I think I'll edit the question with a specific example. Maybe it will be easier to understand. – JoshiRaez Aug 13 '20 at 09:50

2 Answers2

1

If I understand right, you have a service which has some method that triggers an http call, like dataRepository.fetch(id), and you have different components that need to react when the response of the call arrives.

If this is the case, there are many ways to deal with such requirement, one being to use Subjects exposed as public properties of the service, which is what I understand you want to do.

To achieve this behavior the code you have written is what you need, in other words this is OK

dataRepository.fetch(id)
  .subscribe(
    newData => currentData.next(newData)
  )

If you want to make it more complete and manage also the error and complete case, you may write it like this

dataRepository.fetch(id)
  .subscribe(currentData)

In this last form, you are passing currentData as the Observer parameter of subscribe. currentDatathough is also an Observable and therefore can next, error and complete.

If you use ReplaySubject you can add the possibility to store the last results and present them to components which are created after the notification of the result.

Picci
  • 16,775
  • 13
  • 70
  • 113
  • So there is no way other than having to go syncronous with the observable? (using subscribe). Shame D:. Although I sorta get that if you subscribe with an observable, you are getting all the advantages of the observable, so it will handle the 3 operations on its own. But doing subscribe wont it break the reactive chain? like || HTTP to Fetch -> Reactive/Asynchronous || Subscribe -> Imperative/Syncronous || Subject to async pipe -> Reactive/Asynchronous || That break in the middle is what I want to avoid. But from what you said it seems like there is no choice around it. – JoshiRaez Aug 13 '20 at 12:11
  • Btw I dont get your last example. Because the thing is every time the event triggers, I create a new fetch, but the components don't know that there is a new source, I would have to manually update every one of them, which kinda defeats the purpose of observables – JoshiRaez Aug 13 '20 at 12:12
  • 1
    @ JoshiRaez You have 1 event that you want to multiplex to n notifications. So I do not think you have any other way with RxJS. Plus I do not see that Reactive is Asynchronous while Imperative is Synchronous. RxJS can be perfectly reactive and perfectly synchronous. Consider for instance an app that reacts to a message coming from WebSockets. The reaction can be perfectly synchronous and it is still a reactive way of coding. [This article](https://christianlydemann.com/are-observables-async/) gives more details. – Picci Aug 14 '20 at 07:36
  • 1
    @ JoshiRaez regarding your last comment, you are right. `share` operator does not apply to your case. I have edited my response and removed it. – Picci Aug 14 '20 at 07:39
0

Original component retrieved observable is dynamically changed, but that would be really dangerous (any pipe could break any observable). I would need another thing not called pipe.

I disagree. you can easily create dynamic observables using functions that return an observable based on your Input data and It will no break.

function getUserFriendsById(userId: string): Observable<User[]>{
return service.getProfileById(userId).pipe(
  mergeMap(user=>{
    return getUsersByArrayOfIds(user.friends);
  })
 )
}

getUserFriendsById('exampleUserId').subscribe(friends=>{...});

but If you think If something breaks inside you can simply use CatchError.

function getUserFriendsById(userId: string): Observable<User[]>{
return service.getProfileById(userId).pipe(
  mergeMap(user=>{
    return getUsersByArrayOfIds(user.friends).pipe(
      catchError(err => {
      return of([]) // will return empty friend list if this APi call fails.
    }))
  }),
  catchError(err=>{
   return of([]); // will return empty friend list if this APi call fails.
  })
 )
}

getUserFriendsById('exampleUserId').subscribe(friends=>{...});

We use mergeMap simply because we need first Observable's Data to send second request.

If what you are looking is something to mix these observables from base then It would make all of rxjs operators useless because that's what they are here for.

But If you don't want to emit data to your currentData Observable and you know your data source then you don't even need to create separate Observable for it. Just using pipe to get the data is enough.

currentData = dataRepository.fetch(id);
// at some point when you need currentData data you will subscribe to it.

// is same as 

dataRepository.fetch(id)
  .subscribe(
    newData => currentData.next(newData)
  )

Or even when you need to call another Observable by dataRepository.fetch(id) response

dataRepository.fetch(id).pipe(
mergeMap(fetchedId=>{
    return getUserById(fetchedId);
  })
)

But still If you want to go deeper and create a flow without passing data I think you will get disappointed because It's How Observables in general work. and I guess this approach is very limited and hard to maintain because you finally reach a point where you need to send different requests to multiple API and gather data and that's when flow will break. https://www.learnrxjs.io/learn-rxjs/operators/creation/create

// RxJS v6+
import { Observable } from 'rxjs';
/*
  Create an observable that emits 'Hello' and 'World' on  
  subscription.
*/
const hello = Observable.create(function(observer) {
  observer.next('Hello');
  observer.next('World');
  observer.complete();
});

I hope it helps but let me know If it's not what you are looking for.

Eric Aska
  • 611
  • 5
  • 14
  • I think I misexplained myself with the "dynamic" thing. When I meant "dynamic", I meant something like this Original behaviour: Source will emit: 1(t0), 2(t2), 3(t3), C First observable from source pipes with First. That observable will be 1(t0), C Second observable from source pipes with Map x => x*2. That observable will bring 2(t0), 4(t2), 6(t3), C Notice how the first and second piped sources (the two ner observables) don't interact between themselves. – JoshiRaez Aug 13 '20 at 09:44
  • But if they were dynamic, what I meant that would happen is that every """"pipe"""" would change the other observables, basically changing the source. That's not how reactive works at all, I know, byt I meant that would be dangerous as well because it would couple an observable to every instance piping it. Source will emit: 1(t0), 2(t2), 3(t3), C First observable from source pipes with First. That observable will be 1(t0), C But the source would also be 1(t0), C Second observable from source pipes with Map x => x*2. That observable, the previous, and the source would become now 2(t0), C – JoshiRaez Aug 13 '20 at 09:49