2

Ok, so I'm a complete beginner with Rx and unfortunately very new to js and streams in js as well. Im using this https://github.com/trygve-lie/twitter-stream-api to connect to twitters streaming api and receive json objects with tweets. So far I have this code

var Rx = require('rxjs/Rx');

var TwitterStream = require('twitter-stream-api'),
    fs = require('fs');
var filter = 'tweet';
var keys = {
    consumer_key : "key",
    consumer_secret : "secret",
    token : "token",
    token_secret : "tokensecret"
};

var Twitter = new TwitterStream(keys);
Twitter.stream('statuses/filter', {
    track: filter
});

Twitter.on('connection success', function (uri) {
    console.log('connection success', uri); 
});
Twitter.on('data', function (obj) {
    console.log(obj.text);
});

I am successfully writing tweets to the console but what I am really trying to learn is working with streams and in particular RxJS. I have tried all the ways I can think of to create an observable. Rx.Observable. create/from etc...

I have also tried Twitter.resume() as it is apparently paused by default to resume the stream and observe that. I only get errors such as cannot .subscribe is not a function. From what I have above, how can I use Rx.Observable to start filtering and playing around with the data?

Thanks!

JimJimL
  • 129
  • 2
  • 9

2 Answers2

2

RxJS 5 doesn't have any methods to convert stream from/to an Observable so you'll need to do this by yourself. Ideally with Observable.create.

const Rx = require('rxjs');
const Observable = Rx.Observable;

var TwitterStream = require('twitter-stream-api'),

...

var source$ = Observable.create(observer => {
  var Twitter = new TwitterStream(keys);
  Twitter.stream('statuses/filter', {
    track: filter
  });

  Twitter.on('data', function (obj) {
    observer.next(obj);
  });

  return () => {
    Twitter.close();
  };
});

This makes a cold Observable that'll connect to Twitter only when you subscribe to it. The Observable.create static methods let's you push values to the observer and at the end return a tear down function then just closes the connection. This function is called when you unsubscribe or when the Observable completes.

Then you can chain this Observable with whatever you want:

source$.filter(...).map(...)

Note, that there're also methods Observable.bindCallback() and Observable.bindNodeCallback() but these wouldn't help you much in your situation.

Read more:

martin
  • 93,354
  • 25
  • 191
  • 226
  • Thanks a lot for your answer! I havent had time to try it properly yet. Quick question as I get 'Observable is not defined', is it not supposed to be Rx.Observable.create() ? Or do I need to require (import) something more? – JimJimL Mar 08 '17 at 10:40
  • @JimJimL See the code now, I added also the `require()` calls. – martin Mar 08 '17 at 12:17
0

Here's an example using desmondmorris/node-twitter and rxjs 5.

const Observable = require('rxjs').Observable;

Observable
  .of(new require('twitter')({
    consumer_key: 'xxxx',
    consumer_secret: 'xxxx',
    access_token_key: 'xxxx',
    access_token_secret: 'xxxx',
  })).mergeMap(twitter =>
    Observable
    .fromEvent(twitter.stream('statuses/filter', {
        track: 'Stack Overflow'
      }),
      'data'))
  .filter(tweet => tweet.user.follow_count > 10000)
  .subscribe(console.log);
Enki
  • 1,565
  • 2
  • 13
  • 20