It sounds like you solved your problem, but to your original question
I'd like to force an observer change to complete early, because my test code knows how many lines should be in the file. How do I do this?
In general the use of Subject
s is discouraged when you have better alternatives, since they tend to be a crutch for people to use programming styles they are familiar with. Instead of trying to use a Subject
I would suggest that you think about what each event would mean in an Observable life cycles.
Wrap Event Emitters
There already exists wrapper for the EventEmitter#on/off
pattern in the form of Observable.fromEvent
. It handles clean up and keeping the subscription alive only when there are listeners. Thus ObserveTail
can be refactored into
function ObserveTail(filename) {
return Rx.Observable.create(function(observer) {
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
var line = Rx.Observable.fromEvent(tail, "line");
var close = Rx.Observable.fromEvent(tail, "close");
var error = Rx.Observable.fromEvent(tail, "error")
.flatMap(function(err) { return Rx.Observable.throw(err); });
//Only take events until close occurs and wrap in the error for good measure
//The latter two are terminal events in this case.
return line.takeUntil(close).merge(error).subscribe(observer);
});
}
Which has several benefits over the vanilla use of Subjects
, one, you will now actually see the error downstream, and two, this will handle clean up of your events when you are done with them.
Avoid *Sync Methods
Then this can be rolled into your file existence checking without the use of readSync
//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
.filter(function(exists) { return exists; })
.flatMap(ObserveTail(filename));
Next you can simplify your filter/map/map sequence down by using flatMap
instead.
var result = source.flatMap(function(x) {
try {
return Rx.Observable.just(JSON.parse(x));
} catch (e) {
return Rx.Observable.empty();
}
},
//This allows you to map the result of the parsed value
function(x, json) {
return json.name;
})
.timeout(10000, "observer timed out");
Don't signal, unsubscribe
How do you stop "signal" a stop when streams only travel in one direction. We rarely actually want to have an Observer directly communicate with an Observable, so a better pattern is to not actually "signal" a stop but to simply unsubscribe from the Observable
and leave it up to the Observable's behavior to determine what it should do from there.
Essentially your Observer
really shouldn't care about your Observable
more than to say "I'm done here".
To do that you need to declare a condition you want to reach in when stopping.
In this case since you are simply stopping after a set number in your test case you can use take
to unsubscribe. Thus the final subscribe block would look like:
result
//After lines is reached this will complete.
.take(lines)
.subscribe (
function(name) {
tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
},
function(err) {
console.error(err)
tid.fail("err leaked through to subscriber");
},
function() {
tid.end();
console.log("Completed");
}
);
Edit 1
As pointed out in the comments, In the case of this particular api there isn't a real "close" event since Tail is essentially an infinite operation. In this sense it is no different from a mouse event handler, we will stop sending events when people stop listening. So your block would probably end up looking like:
function ObserveTail(filename) {
return Rx.Observable.create(function(observer) {
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
var line = Rx.Observable.fromEvent(tail, "line");
var error = Rx.Observable.fromEvent(tail, "error")
.flatMap(function(err) { return Rx.Observable.throw(err); });
//Only take events until close occurs and wrap in the error for good measure
//The latter two are terminal events in this case.
return line
.finally(function() { tail.unwatch(); })
.merge(error).subscribe(observer);
}).share();
}
The addition of the finally
and the share
operators creates an object which will attach to the tail when a new subscriber arrives and will remain attached as long as there is at least one subscriber still listening. Once all the subscribers are done however we can safely unwatch
the tail.