I'm using node.js 8.9.3 in Windows 7. I'm writing a class that reads the first part of a file (the "header") automatically in the constructor using a stream and the byline
npm package. The data in the header is passed to a processing function using event handling and the native data
event in byline
. After the last line in the header of the file is read, the stream should be paused, the existing listener for the data
event should be removed, and another listener added which employs a different processing function. So in essence I'm replacing one processing function with another.
The problem I'm experiencing is that even though I'm attempting to specifically remove the header processing function, and I'm successfully attaching the subsequent processing function, the header function continues to process lines from the data
event after it's been removed. In other words, both the new data processing function and the old header processing function continue to be used on each new line after the header, even after the old processing function was removed using removeListener
Here's my code, with unrelated portions removed:
const fs = require('fs'),
path = require('path'),
byline = require('byline'),
EventEmitter = require('events');
class VCFStream extends EventEmitter{
constructor(vcfPath){
super();
this.header = [];
this.variants = {};
this.stream = byline.createStream(fs.createReadStream(
vcfPath, { encoding: 'utf8' }));
this.stream.on('data', this.headerParse.bind(this));
this.stream.on('end', function(){
console.log('Stream is done');
self.emit('end');
});
}
headerParse(line){
var self = this;
if(line.startsWith('#CHROM')){
//This is the last line of the header.
line.split('\t').splice(9).forEach(function(samp){
self.samples.push(samp);
});
this.stream.pause();
// I've also tried using this.headerParse
// and this.headerParse.bind(this)
// and this.headerParse()
this.stream.removeListener('data', self.headerParse);
//Attaching the event listener to process new data after the header
this.stream.on('data', self.variantParse.bind(self));
this.header.push(line);
this.emit('header');
console.log('header');
return;
}
this.header.push(line);
}
variantParse(line){
let contig = line.split('\t')[0];
if(!this.variants.hasOwnProperty(contig)){
this.variants[contig] = [];
}
this.variants[contig].push(line);
}
/*
I've also tried using the below class method,
calling this.removeHeaderListener() instead of
this.stream.removeListener('data', self.headerParse);
*/
removeHeaderListener(){
this.stream.removeListener('data', this.headerParse);
}
}
module.exports = VCFStream;
If I stick a console.log(line)
in the headerParse()
, method, it continues to log every line, event after the listener has been removed.