0

I'm having my streaming web-service running on localhost:8080/stream, which response when any new message added to subscribed mqtt stream. I want to consume this web-service in my Angular2 app. I'm using RxJS to consume NodeJS APIs in Angular2. I tried following code which calls localhost:8080/stream once and ends response. I want my observable to listen continuously to web-service.

var headers = new Headers();
headers.append('Access-Control-Allow-Origin', '*');
headers.append('Content-Type', 'application/json');

let options = new RequestOptions({ headers: headers }); // Create a request option
 return this.http.get("http://localhost:8080/stream", options) // ...using post request
   .map((res: Response) => res.json()) // ...and calling .json() on the response to return data
   .catch((error: any) => Observable.throw(error.json().error));
netfreak30
  • 306
  • 1
  • 6
  • 19

3 Answers3

1

If I understand your question right, you want to consume data from stream where new messages arrive at some period of time.

To achieve this You need add subscribe to the service.

return this.http.get("http://localhost:8080/stream", options) // ...using post request
   .map((res: Response) => res.json()) // ...and calling .json() on the response to return data
   .catch((error: any) => Observable.throw(error.json().error)
   .subscribe(result => this.result =result));

Result will be updated as new data arrives, and you can use it the way want.

Note: It is best practice to make http calls separate in services and subscribe the service in your component.

For your reference I am adding an example I have worked on for demo purpose.

  1. Create a service for http calls

@Injectable() export class JsonPlaceHolderService{

    constructor(private http:Http){}

    getAllPosts():Observable<Post[]>{

        return this.http.get("https://jsonplaceholder.typicode.com/posts")
        .map(res=>res.json())


    }       

}

  1. From your component call service and listen to changes continuously.

    export class PostsComponent implements OnInit{

    constructor(private _service:JsonPlaceHolderService){}
    
    jphPosts:Post[];
    
    title:string="JsonPlaceHolder's Post data";
    
    ngOnInit():void{
    
        this._service.getAllPosts().subscribe(
            (data) =>this.jphPosts = data,
            (err) => console.log(err),
            ()=>console.log("service call completed")
        );
    
    }
    

    }

Dheeraj Kumar
  • 3,917
  • 8
  • 43
  • 80
0

Streaming data from nodejs to angular with socket.io

This is something that would have been of great use when I was trying to do this. Following contains code from a socket.io package for angular credit goes to original author. This is taken from a working solution and may need some tweaking.

Server side

var app     = express(),
    http    = require('http'),
    ioServer    = require('socket.io');

var httpServer = http.createServer(app);
var io = new ioServer();
httpServer.listen(1337, function(){
  console.log('httpServer listening on port 1337');
});

io.attach(httpServer);

io.on('connection', function (socket){
    console.log(Connected socket ' + socket.id);
});


//MQTT subscription
client.on('connect', function () { 
    client.subscribe(topic, function () { 
        console.log("subscribed to " + topic) 
        client.on('message', function (topic, msg, pkt) { 

            io.sockets.emit("message", {topic: topic, msg: msg, pkt: pkt});

        }); 
    }); 
});

Client Side

Create a customService in angular with following

import * as io from 'socket.io-client';

declare

private ioSocket: any;
private subscribersCounter = 0;

inside service class

constructor() {

    this.ioSocket = io('socketUrl', {});

}

on(eventName: string, callback: Function) {
    this.ioSocket.on(eventName, callback);
}

removeListener(eventName: string, callback?: Function) {
    return this.ioSocket.removeListener.apply(this.ioSocket, arguments);
}

fromEvent<T>(eventName: string): Observable<T> {
    this.subscribersCounter++;
    return Observable.create((observer: any) => {
        this.ioSocket.on(eventName, (data: T) => {
            observer.next(data);
        });
        return () => {
            if (this.subscribersCounter === 1) {
                this.ioSocket.removeListener(eventName);
            }
        };
    }).share();
}

In your component, Import customService as service

service.on("message", dat => {
 console.log(dat);
});
shaN
  • 798
  • 12
  • 23
0

You should use websocket on angular and make it listen to your service URL after that you should listen to its events (open, close, message) and then create your own Subject stream using Rxjs to push the new data to the subscribers.

Please check the URL below:

https://medium.com/@lwojciechowski/websockets-with-angular2-and-rxjs-8b6c5be02fac

Nour
  • 5,609
  • 3
  • 21
  • 24