I'd like to crawl data over SSH in a server cluster with NodeJS.
The remote scripts output JSON that is then parsed and split into an object stream.
My problem is now that the callback-oriented libraries I use (SSH2, MySQL) lead to a callback-pattern that I find hard to match with the Readable API spec. How to implement _read(size)
when the data to push is behind a bunch of callbacks?
My current implementation takes advantage of the fact that Streams
are also EventEmitters
. I start to populate my data upon constructing the Stream instance. When all my callbacks are done, I emit an event. I then listen on the custom event, and only then do I start to push data downwards down the pipe chain.
// Calling code
var stream = new CrawlerStream(argsForTheStream);
stream.on('queue_completed', function() {
stream
.pipe(logger)
.pipe(dbWriter)
.on('end', function() {
// Close db connection etc...
});
});
A mock of the CrawlerStream
would be
// Mock of the Readable stream implementation
function CrawlerStream(args) {
// boilerplate
// array holding the data to push
this.data = [];
// semi-colon separated string of commands
var cmdQueue = getCommandQueue();
var self = this;
db.query(sql, function(err, sitesToCrawl, fields) {
var servers = groupSitesByServer(sitesToCrawl);
for (var s in servers) {
sshConnect(getRemoteServer(s), function(err, conn) {
sshExec({
ssh: conn,
cmd: cmdQueue
}, function(err, stdout, stderr) {
// Stdout is parsed as JSON
// Finally I can populate self.data!
// Check if all servers are done
// If I'm the last callback to execute
self.data.push(null);
self.emit('queue_completed');
})
});
}
});
}
util.inherits(CrawlerStream, Readable);
CrawlerStream.prototype._read = function(size) {
while (this.data.length) {
this.push(this.data.shift());
}
}
I'm unsure if this is the idiomatic way to accomplish this and would like to get your advice.
Please note in your answers that I'd like to retain the vanilla NodeJS style of using callbacks (no promises) and that I'm stuck with ES5.
Thanks for your time!