60

I am quite new to angular and rxjs. I am trying to create an angular2 app that gets some data from staticly served text file(Locally on server), which I would like to retrieve and map to Datamodel using Angular2's http provider and rxjs's map at a fixed time interval(5000). To reflect any changes to the served txt file.

With rxjs 4.x I know you could use Observable.interval(5000) to do the job, but it does not seem to exist in rxjs 5. My workaround currently refresh the whole application using <meta http-equiv="refresh" content="5" > Which reloads the whole page, and thus reloads the data.

So what I would really like is some way to do this working with observables, maybe to check if any changes have happened. or just to reload the data anew.

Any help or other/better way will be very much appreciated.

What I have so far:

@Injectable()
export class DataService {

    constructor(private http:Http){}

    getData(url) {
        return this.http.get(url)
            .map(res => {
                return res.text();
            })
            .map(res => {
                return res.split("\n");
            })
            .map(res => {
                var dataModels: DataModel[] = [];
                res.forEach(str => {
                    var s = str.split(",");
                    if(s[0] !== "") {
                        dataModels.push(new DataModel(s[0], parseInt(s[1]), parseInt(s[2])));
                    }
                });
                return dataModels;
            })
    }
}

@Component({
selector: 'my-app',
template: `Some html to display the data`,
providers: [DataService],
export class AppComponent {

data:DataModel[];

constructor(dataService:DataService) {}

ngOnInit() {
    this.dataService.getData('url').subscribe(
        res => {
            this.data= res;

        },
        err => console.log(err),
        () => console.log("Data received")
        );
    }
}

Dependencies: package.json

"dependencies": {
  "angular2": "^2.0.0-beta.3",
  "bootstrap": "^4.0.0-alpha.2",
  "es6-promise": "^3.0.2",
  "es6-shim": "^0.33.13",
  "jquery": "^2.2.0",
  "reflect-metadata": "^0.1.2",
  "rxjs": "^5.0.0-beta.0",
  "systemjs": "^0.19.20",
  "zone.js": "^0.5.11"
},
"devDependencies": {
  "typescript": "^1.7.5"
}

index.html imports:

<script src="node_modules/es6-shim/es6-shim.min.js"></script>
<script src="node_modules/systemjs/dist/system-polyfills.js"></script>

<script src="node_modules/angular2/bundles/angular2-polyfills.js"></script>
<script src="node_modules/systemjs/dist/system.src.js"></script>
<script src="node_modules/rxjs/bundles/Rx.js"></script>
<script src="node_modules/angular2/bundles/angular2.dev.js"></script>
<script src="node_modules/angular2/bundles/router.dev.js"></script>
<script src="node_modules/angular2/bundles/http.dev.js"></script>
Alexander Abakumov
  • 13,617
  • 16
  • 88
  • 129
Tony Krøger
  • 988
  • 1
  • 10
  • 11

6 Answers6

85

As @Adam and @Ploppy mentioned, the Observable.interval() is now deprecated not the preferred way of creating such an observable. The preferred way of doing this is via the IntervalObservable or TimerObservable. [currently in Typscript 2.5.2, rxjs 5.4.3, Angular 4.0.0]

I wanted to add some usage to this answer to demonstrate what I found the best way of doing this in the Angular 2 framework.

First your service (created in angular cli via the 'ng g service MyExample" command). Assuming the service is RESTful (http get request returns a json):

my-example.service.ts

import { Injectable } from '@angular/core';
import { Http, Response} from "@angular/http";
import { MyDataModel } from "./my-data-model";
import { Observable } from "rxjs";
import 'rxjs/Rx';

@Injectable()
export class MyExampleService {
  private url = 'http://localhost:3000'; // full uri of the service to consume here

  constructor(private http: Http) { }

  get(): Observable<MyDataModel>{
    return this.http
      .get(this.url)
      .map((res: Response) => res.json());
  }
}

*** see bottom updates to service for Angular 5 ***

Now your component code ('ng g component MyExample'):

my-example.component.ts:

import { Component, OnDestroy, OnInit } from '@angular/core';
import { MyDataModel } from "../my-data-model";
import { MyExampleService } from "../my-example.service";
import { Observable } from "rxjs";
import { IntervalObservable } from "rxjs/observable/IntervalObservable";
import 'rxjs/add/operator/takeWhile';

@Component({
  selector: 'app-my-example',
  templateUrl: './my-example.component.html',
  styleUrls: ['./my-example.component.css']
})
export class MyExampleComponent implements OnInit, OnDestroy {
  private data: MyDataModel;
  private display: boolean; // whether to display info in the component
                            // use *ngIf="display" in your html to take
                            // advantage of this

  private alive: boolean; // used to unsubscribe from the IntervalObservable
                          // when OnDestroy is called.

  constructor(private myExampleService: MyExampleService) {
    this.display = false;
    this.alive = true;
  }

  ngOnInit() {
    // get our data immediately when the component inits
    this.myExampleService.get()
      .first() // only gets fired once
      .subscribe((data) => {
        this.data = data;
        this.display = true;
      });

    // get our data every subsequent 10 seconds
    IntervalObservable.create(10000)
      .takeWhile(() => this.alive) // only fires when component is alive
      .subscribe(() => {
        this.myExampleService.get()
          .subscribe(data => {
            this.data = data;
          });
      });
  }

  ngOnDestroy(){
    this.alive = false; // switches your IntervalObservable off
  }
}

=== edit ===

Updated the component ts code to consolidate the subscriptions via a TimerObservable:

import { Component, OnDestroy, OnInit } from '@angular/core';
import { MyDataModel } from "../my-data-model";
import { MyExampleService } from "../my-example.service";
import { Observable } from "rxjs";
import { TimerObservable } from "rxjs/observable/TimerObservable";
import 'rxjs/add/operator/takeWhile';

@Component({
  selector: 'app-my-example',
  templateUrl: './my-example.component.html',
  styleUrls: ['./my-example.component.css']
})
export class MyExampleComponent implements OnInit, OnDestroy {
  private data: MyDataModel;
  private display: boolean; // whether to display info in the component
                            // use *ngIf="display" in your html to take
                            // advantage of this

  private alive: boolean; // used to unsubscribe from the TimerObservable
                          // when OnDestroy is called.
  private interval: number;

  constructor(private myExampleService: MyExampleService) {
    this.display = false;
    this.alive = true;
    this.interval = 10000;
  }

  ngOnInit() {
    TimerObservable.create(0, this.interval)
      .takeWhile(() => this.alive)
      .subscribe(() => {
        this.myExampleService.get()
          .subscribe((data) => {
            this.data = data;
            if(!this.display){
              this.display = true;
            }
          });
      });
  }

  ngOnDestroy(){
    this.alive = false; // switches your TimerObservable off
  }
}

=== edit ===

my-example-service.ts (using the HttpClient a la Angular 5):

import { Injectable } from '@angular/core';
import { HttpClient} from "@angular/common/http";
import { MyDataModel } from "./my-data-model";
import { Observable } from "rxjs";
import 'rxjs/Rx';

@Injectable()
export class MyExampleService {
  private url = 'http://localhost:3000'; // full uri of the service to consume here

  constructor(private http: HttpClient) { }

  get(): Observable<MyDataModel>{
    return this.http
      .get<MyDataModel>(this.url);
  }
}

Note change to use the HttpClient rather than Http (deprecated in angular5) and the get method which allows for parsing the response into our data model without having to employ the rxjs .map() operator. While the service changes for angular 5, the component code remains unchanged.

ZackDeRose
  • 2,146
  • 1
  • 16
  • 28
  • 1
    I'm a bit curious about why you are using the alive boolean instead of unsubscribing from the observable on ngOnDestroy() ? – Tony Krøger Jul 04 '17 at 11:00
  • I was following advice of [link](https://medium.com/@benlesh/rxjs-dont-unsubscribe-6753ed4fda87) and [link](http://brianflove.com/2016/12/11/anguar-2-unsubscribe-observables/). Worth reading through if interested. My takeaway was that the takeWhile() operator is preferable to unsubscribing since it will terminate the Observable. – ZackDeRose Jul 13 '17 at 17:04
  • Thank you! Really liked the takeWhile pattern. Did not know it unsubscribed from the observable. And much prefer it to making a collection of subscriptions! – Tony Krøger Jul 25 '17 at 07:40
  • Thanks, this detailed answer saved a lot of my time! – Soumya Kanti Jul 25 '17 at 09:32
  • 3
    Im getting property timer does not exist on Observable.. do you know why? – obey Aug 31 '17 at 11:26
  • @ZackDeRose thanks for your answer, what would it be the proper way to test this code? I asked a question here https://stackoverflow.com/questions/46546628/how-to-properly-test-a-method-which-calls-intervalobservable – Francesco Borzi Oct 03 '17 at 14:09
  • im confused. Original answer says interval is deprecated but then goes on to use timer in the "updated" code. If interval is deprecated wouldnt timer also be deprecated since they both do the same thing? – Curtis Oct 17 '17 at 17:04
  • @obey - Most likely it has something to do with your rxjs version. Should be able to see your version in your package.json file. Check the my latest update to the answer to avoid using the Observable.timer property – ZackDeRose Oct 27 '17 at 11:33
  • @Curtis deprecated is probably the wrong word here. I'd say instead that the IntervalObservable.create() is preferable when using typescript as it takes advantage of the inheritance that TypeScript offers. I don't know if there's much of a function difference though. See my recent update for a solution that uses TimerObservable subclass instead of the Observable.timer property. – ZackDeRose Oct 27 '17 at 11:44
  • seems this will generate new subscription each time the timer fires – godblessstrawberry May 07 '19 at 15:25
  • This seems to be an ever-changing space. First came `Observable.interval`, then (as reflected in this answer) came `IntervalObservable` and then in RxJS 6 the old observable classes [were removed](https://github.com/ReactiveX/rxjs/blob/master/docs_app/content/guide/v6/migration.md#observable-classes) and replaced with functions like [`interval`](https://rxjs-dev.firebaseapp.com/api/index/function/interval) and `timer`. – George Hawkins Mar 25 '20 at 14:13
31

You can use the interval method of Observable within Angular2.

import {Component,Input} from 'angular2/core';
import {Observable} from 'rxjs/Rx';

@Component({
  selector: 'my-app',
  template: `
    <div>
      {{message}}
    </div>
  `
})
export class AppComponent {
  constructor() {
    Observable.interval(500)
          .take(10).map((x) => x+1)
          .subscribe((x) => {
            this.message = x;
          }):
  }
}

Here is the corresponding plunkr describing this: https://plnkr.co/edit/pVMEbbGSzMwSBS4XEXJI?p=preview.

Based on this, you can plug your HTTP request:

initializePolling() {
  return Observable
     .interval(60000)
     .flatMap(() => {
       return this.dataService.getData('url'));
     });
}
Thierry Templier
  • 198,364
  • 44
  • 396
  • 360
  • 2
    Thank you for your quick response and the plunker example was really helpfull. And I got it working locally on a new project with the same cdn imports as you have. But on my main project this still fails, with: TypeError: Observable_1.Observable.interval is not a function, So im trying to change on my imports to match yours. npm does not seem to have rxjs-beta3 for instance. – Tony Krøger Feb 10 '16 at 14:47
  • In fact angular2@2.0.0-beta.3 requires rxjs@5.0.0-beta.0. I made a test with these versions and it works on my side... – Thierry Templier Feb 10 '16 at 15:01
  • Which versions do you use? – Thierry Templier Feb 10 '16 at 15:01
  • Found the error: automatic import with Webstorm failed. instead of import {Observable} from "rxjs/Rx"; which is correct it added a import with import {Observable} from "rxjs/Observable"; – Tony Krøger Feb 10 '16 at 15:18
  • And thanks again, the initializePolling part really helped a lot aswell :) – Tony Krøger Feb 10 '16 at 15:23
  • Is there a way to run the interval first time before waiting the amount of time? – Tony Krøger Feb 10 '16 at 16:15
  • You could try the `startWith` operator: `Observable.interval(1000).startWith(0)...` – Thierry Templier Feb 10 '16 at 17:02
  • 1
    I think this link could help you: https://gist.github.com/staltz/868e7e9bc2a7b8c1f754. Especially in the section "The refresh button". – Thierry Templier Feb 10 '16 at 17:46
20

I think this answer is not valid anymore due to recent changes of rxjs/observable You now have to use IntervalObservable.

https://github.com/ReactiveX/rxjs/blob/master/src/observable/IntervalObservable.ts

import { IntervalObservable } from 'rxjs/observable/IntervalObservable';

@Component({
  ...
})
export class AppComponent {
  n: number = 0;
  constructor() {
    IntervalObservable.create(1000).subscribe(n => this.n = n);
  }
}
juzzlin
  • 45,029
  • 5
  • 38
  • 50
Ploppy
  • 14,810
  • 6
  • 41
  • 58
  • 1
    How do you subscribe to an observable returned by a service with this code ? – Jérémy JOKE May 24 '17 at 08:08
  • I'm not author, but I simple wrap my code subscribe with this IntervalObservable subscribe something linke this: `IntervalObservable.create(10000).subscribe(response => { this.notificationService.getNotification().subscribe(notification => { this.notificationService.notification = notification; }); });` – Denis Savenko Mar 26 '18 at 04:12
8

For TypeScript (1.8.10 at time of answer)/angular2 (rc1 at time of answer) with rxjs@5.0.0 (beta.6 at time of answer) you need to use the IntervalObservable which extends the Observable class

import {IntervalObservable} from 'rxjs/observable/IntervalObservable'

IntervalObservable.create(5000).take(10).map((x) => x + 1)
juzzlin
  • 45,029
  • 5
  • 38
  • 50
Adam
  • 2,082
  • 2
  • 19
  • 17
  • 2
    `rxjs/observable/IntervalObservable` Small case letter. Can you give me an example on how to use this for server polling in Angular2 services? – Sulejman Sarajlija Jun 01 '16 at 19:43
  • @sulejman I figure you found it out, but still: IntervalObservable.create(1000).subscribe(() => { this.service.get().subscribe(x => this.x = x) }); // this will do the trick – yusijs Nov 07 '16 at 14:02
  • 1
    sorry but how exactly would we use this with our HTTP Observable ? – Motassem Kassab Feb 02 '17 at 12:54
3

Disaclaimer: this was originaly an edit for another answer, but contains too many changes.

This can be easily done via switchMap:

Observable.timer(0, 5000)
  .switchMap(() => this.http.get(...).pipe(...)
  .subscribe(...)

Or in RxJS 6 syntax:

import { timer } from 'rxjs';
import { switchMap } from 'rxjs/operators';

timer(0, 5000) // repeats every 5 seconds
  .pipe(switchMap(() => this.http.get(...).pipe(...))
  .subscribe(...);

You can even use a interval instead of a timer:

import { interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';

interval(5000) // repeats every 5 seconds
  .pipe(switchMap(() => this.http.get(...).pipe(...))
  .subscribe(...);
Machado
  • 8,965
  • 6
  • 43
  • 46
2

this can be easily done via switchMap

Observable.timer(0, 5000)
          .switchMap((t) =>
            this.http.get(...).pipe(
                catchError(...)
            )
          )
          .subscribe(...)
godblessstrawberry
  • 4,556
  • 2
  • 40
  • 58