5

I've got an rxjs observer (really a Subject) that tails a file forever, just like tail -f. It's awesome for monitoring logfiles, for example.

This "forever" behavior is great for my application, but terrible for testing. Currently my application works but my tests hang forever.

I'd like to force an observer change to complete early, because my test code knows how many lines should be in the file. How do I do this?

I tried calling onCompleted on the Subject handle I returned but at that point it's basically cast as an observer and you can't force it to close, the error is:

Object # has no method 'onCompleted'

Here's the source code:

function ObserveTail(filename) {

source = new Rx.Subject();

if (fs.existsSync(filename) == false) {
    console.error("file doesn't exist: " + filename);
}

var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);

tail.on("line", function(line) {
        source.onNext(line);
});
tail.on('close', function(data) {
    console.log("tail closed");
    source.onCompleted();
});     
tail.on('error', function(error) {
    console.error(error);
});     

this.source = source;
}           

And here's the test code that can't figure out how to force forever to end (tape style test). Note the "ILLEGAL" line:

test('tailing a file works correctly', function(tid) {

var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);

handle.source
.filter(function (x) {
    try {
        JSON.parse(x);
        return true;
    } catch (error) {
        tid.pass("correctly caught illegal JSON");
        return false;
    }
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
        i++;
        if (i >= lines) {
            handle.onCompleted();   // XXX ILLEGAL
        }
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

})
Paul S
  • 892
  • 10
  • 25
  • Use your favorite debugger and have a look at `handle`. You should find out your mistake. The more time you spend debugging by yourself and the better you become at debugging and it is a really valuable skill. Otherwise, javascript is not a typed language which a shame, so you have to pay attention to the objects that you pass from one place to another, as there will be no warning from the compiler when you use them incorrectly. It takes some practice and a lot of mistakes to get used to it so the more experience the better. – user3743222 Jan 28 '16 at 03:13
  • but is this the correct pattern for solving this problem? – Paul S Jan 28 '16 at 03:31
  • Okay, it's handle.source.onCompleted(). Wish I could get the exception handler in javascript to completely print out the object in question by default. – Paul S Jan 28 '16 at 03:38
  • But the overall question still stands: Is this the correct pattern for "tell the upstream source to complete" in RXJS? I have another upstream source that's on a setTimeout loop (polls a website), and I have to set a magic boolean to tell it to exit the loop.. I extended the Subject to do that but that's probably bad hackery... The reason I'm posting here is there are a dearth of good real world examples using RXJS, hoping this will help others. – Paul S Jan 28 '16 at 03:40
  • I don't understand your question. In any case, there are two conditions under which a subject will complete: 1. if you call `onCompleted` yourself, 2. if the subject is used as an observer, and the source of that observer completes. Use whichever is more appropriate. – user3743222 Jan 28 '16 at 13:30
  • Also there a few operators that implicitly signal completion. For example, `take`, `takeUntil`, `timeout` etc. They all include some sort of condition that when realized leads to the completion of the observable (which leads to `.onCompleted` called on the associated observer). – user3743222 Jan 28 '16 at 13:31
  • It would be nice to have more real world examples in the documentation for RXJS. All the ones there are magically composed to be brief but don't deal with real world problems like "how do you shut down" or even how to properly use a Subject to interface to traditional emitters, callbacks, and polling loops. I'll be happy to contribute some simplified examples once I'm done with my current sprint. (how about, for example, how to poll a website and emit the results as an RXJS observable?) – Paul S Jan 29 '16 at 18:58

1 Answers1

5

It sounds like you solved your problem, but to your original question

I'd like to force an observer change to complete early, because my test code knows how many lines should be in the file. How do I do this?

In general the use of Subjects is discouraged when you have better alternatives, since they tend to be a crutch for people to use programming styles they are familiar with. Instead of trying to use a Subject I would suggest that you think about what each event would mean in an Observable life cycles.

Wrap Event Emitters

There already exists wrapper for the EventEmitter#on/off pattern in the form of Observable.fromEvent. It handles clean up and keeping the subscription alive only when there are listeners. Thus ObserveTail can be refactored into

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var close = Rx.Observable.fromEvent(tail, "close");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line.takeUntil(close).merge(error).subscribe(observer);
  });
} 

Which has several benefits over the vanilla use of Subjects, one, you will now actually see the error downstream, and two, this will handle clean up of your events when you are done with them.

Avoid *Sync Methods

Then this can be rolled into your file existence checking without the use of readSync

//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
    .filter(function(exists) { return exists; })
    .flatMap(ObserveTail(filename));

Next you can simplify your filter/map/map sequence down by using flatMap instead.

var result = source.flatMap(function(x) {
  try {
    return Rx.Observable.just(JSON.parse(x));
  } catch (e) {
    return Rx.Observable.empty();
  }
}, 
//This allows you to map the result of the parsed value
function(x, json) {
  return json.name;
})
.timeout(10000, "observer timed out");

Don't signal, unsubscribe

How do you stop "signal" a stop when streams only travel in one direction. We rarely actually want to have an Observer directly communicate with an Observable, so a better pattern is to not actually "signal" a stop but to simply unsubscribe from the Observable and leave it up to the Observable's behavior to determine what it should do from there.

Essentially your Observer really shouldn't care about your Observable more than to say "I'm done here".

To do that you need to declare a condition you want to reach in when stopping.

In this case since you are simply stopping after a set number in your test case you can use take to unsubscribe. Thus the final subscribe block would look like:

result
 //After lines is reached this will complete.
 .take(lines)
 .subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

Edit 1

As pointed out in the comments, In the case of this particular api there isn't a real "close" event since Tail is essentially an infinite operation. In this sense it is no different from a mouse event handler, we will stop sending events when people stop listening. So your block would probably end up looking like:

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line
            .finally(function() {  tail.unwatch(); })
            .merge(error).subscribe(observer);
  }).share();
} 

The addition of the finally and the share operators creates an object which will attach to the tail when a new subscriber arrives and will remain attached as long as there is at least one subscriber still listening. Once all the subscribers are done however we can safely unwatch the tail.

paulpdaniels
  • 18,395
  • 2
  • 51
  • 55
  • Thanks so much, I could not find examples of how to do this any of this properly. One issue is that 'Tail' doesn't have a close, it's like tail -f, you have to ctrl-c out of it. Which is convenient for making programs run forever that listen on e.g. a logfile, but inconvenient for tests. I will try your example but I doubt it will exit the test correctly. Basically all the examples including this one assumes the emitter ends things on its own, but actually I find lots of cases where I want the program using the emitter to force an end. See also an example of polling a website on timer. – Paul S Feb 01 '16 at 03:58
  • In fact the general assumption in all the examples including the above is that "the source always cleans itself up". This is not the case when polling a website or doing a tail -f on a file. close() effectively only happens under the listener's control. This defies the intent of RXJS but then again the real world works a lot this way. So what's the proper way to handle this? Extend Observable with a close API like I did by using a Subject instead? Add an API by extending the observable object? Or something else? I think this is the root of the issue. – Paul S Feb 01 '16 at 19:10
  • I am assuming you are using [this library](https://github.com/lucagrulla/node-tail)? I would expect the solution would involve calling `unwatch` once all the observers are gone. I'll update my suggested solution. – paulpdaniels Feb 01 '16 at 19:42
  • Yes, call unwatch. But that works for this case. What about a periodic callback? I'll note https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/callbacks.md doesn't handle periodic calbacks only single callbacks. an example for a periodic timer would be helpful. – Paul S Feb 01 '16 at 20:51