1

Using RxJS with Angular5, I have a service that contains an array. Some components will get created / destroyed / re-created, as the array is being populated.

So we have a service like so:

@Injectable
export class MyService {
   public events = [];
}

then we have a component like so:

@Inject(MyService)
@Component({})
export class MyComponent {

  mySub: Subscriber

  constructor(private ms: MyService){}

  ngOnInit(){

   this.mySub = Rx.Observable.from(this.ms.events).subscribe(v => {

   });

  }

}

my question is - if the events array aleady has elements in it, when the component is created, it will pick up all the existing elements, but what about elements that are added to the array after the subscription is created? How can I listen for when elements are added to the array after the fact?

If I use a Subject, the problem is I don't think it stores the historical items, just fires new ones.

Alexander Mills
  • 90,741
  • 139
  • 482
  • 817
  • looks like this question is similar -https://stackoverflow.com/questions/14466285/can-i-observe-additions-to-an-array-with-rx-js – Alexander Mills Jan 18 '18 at 01:42
  • I believe that I need to merge two different observables, but the caveat is that I need to read all the existing items in the array before receiving any new events from a Subject. – Alexander Mills Jan 18 '18 at 02:09
  • come to think of it, I think ReplaySubject is the one I am looking for that will re-broadcast previous values – Alexander Mills Jan 18 '18 at 03:58

4 Answers4

2

I would store the events within your service. This way the service can keep a history of all the events, and you can use the subject to emit that history.

@Injectable()
export class MyService {
    sub$ = new BehaviorSubject<any[]>([]);
    events: any[] = [];

    // add event to array and push to subject
    addEvent(event){
        this.events.push(event);
        this.sub$.next(this.events);
    }
    // find and remove event from array then push to subject
    removeEvent(event){
        let index = this.events.findIndex(item => {
            return event.id === item.id;
        });
        // if index is found, remove from array
        if(index !== -1){
            this.events.splice(index, 1);
            this.sub$.next(this.events);
        }
    }

}

I would use a behavior subject so the observers that subscribe receive the last emission (could also use a replay subject).

Here is a stackblitz demoing this

LLai
  • 13,128
  • 3
  • 41
  • 45
1

I think @LLai has most of what you need, but I'd change BehaviorSubject to ReplaySubject and also change the emits to single events (given Alexander Mill's answer).

This doesn't cover removing events, although I don't see mention of that requirement.

myService

@Injectable()
export class MyService {
  event$ = new ReplaySubject<any>();

  addEvent(event){
    this.event$.next(event);
  }
}

myComponent

ngOnInit(){
  this.mySub = this.ms.event$.subscribe(v => {
    ...
  });
}

Demo

const replaySubject$ = new Rx.ReplaySubject(); 
// Also, pass in a buffer size to stop blow-out (if applicable)
// const replaySubject$ = new Rx.ReplaySubject(maxBufferSize); 


// Events before subscribe
replaySubject$.next('event1');
replaySubject$.next('event2');

replaySubject$.subscribe(console.log);

// Events after subscribe
replaySubject$.next('event3');
replaySubject$.next('event4');
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
Richard Matsen
  • 20,671
  • 3
  • 43
  • 77
0

Ok so we merge two observables using the concat operator, and it seems to work. In essence, this answer is simply re-creating a ReplaySubject from scratch. Instead of using this answer, I'd recommend using a ReplaySubject instead. In any case:

the service looks like this

@Injectable
export class MyService {

   public subject = new Rx.Subject();
   public events = [];

   addItem(v: any){
    this.events.push(v);
    this.subject.next(v);
  }
}

then the component looks like this

@Inject(MyService)
@Component({})
export class MyComponent {

  mySub: Subscriber

  constructor(private ms: MyService){}

  ngOnInit(){

    const obs = Rx.Observable.from(this.ms.events).concat(this.ms.subject);
    this.mySub = obs.subscribe(v => {

    });

  }

}

The idea is that the observable from the array is concatenated with the subject, and all the items from the array observable would fire first.

Alexander Mills
  • 90,741
  • 139
  • 482
  • 817
  • I think @LLai has the right principle of encapsulating all the observable parts in the service. – Richard Matsen Jan 18 '18 at 20:30
  • This `Rx.Observable.from(this.ms.events).concat(this.ms.subject)` looks a bit like your'e trying to do what `BehaviourSubject` or more likely `ReplaySubject` would do if you used one of those variants in your service. `Subject ... [does not] store the historical items` but ReplaySubject does. – Richard Matsen Jan 18 '18 at 20:35
  • Your use of `@Inject(MyService)` decorating the class looks novel. Any reference to this usage somewhere? – Richard Matsen Jan 18 '18 at 20:41
  • yeah idk, I am using Angular5, I was under the impression that using `@Inject` is what allows the service to be injected into the class instance, but maybe it can be omitted – Alexander Mills Jan 18 '18 at 21:25
  • @RichardMatsen yes you are right, I learned about ReplaySubject after creating this answer – Alexander Mills Jan 18 '18 at 21:26
  • I knew there had to be something built into the RxJS lib, because this seems like such a common use case – Alexander Mills Jan 18 '18 at 21:29
0

If you are worried about state, you should use a Subject which can be initialized with some data and later propagate.

@Injectable
export class MyService {
   public events = [];
   public data$: Subject<any>;

   constructor() {
     this.data$ = this.events;
   }

   addItem(v: any) {
     this.event.push(v);
     this.data$.next(this.event);
   }
}

Then in your component if you want to have a live feed of data.

@Component({//usual angular stuff here})
export class MyComponent implements OnInit {
  myData: Subject<any>;

  constructor(private myService: MyService) { }

  ngOnInit() {
    this.myData = this.myService.data$;
  }
}

You can then use either the async pipe or subscribe to myData to get your data.

Mike Tung
  • 4,735
  • 1
  • 17
  • 24
  • The service acts like a store, which produces your data, in your component you just subscribe or let angular do it for you. Any new events added will automatically trickle. – Mike Tung Jan 18 '18 at 02:40
  • This `this.data$ = this.events` is going to break your observable property. – Richard Matsen Jan 18 '18 at 21:09