I have create 3 read stream, that reads data form 3 tables in a database in JSON format. Now I need to complete three things with this stream and that too in sequence.
- Create md5 cheksum of the stream.
- save the stream in the filesystem in a JSON file with the checksum in the end.
- Read each line of the stream count it and see which records are corrupt, basically JSON.parse will do the job.
- save all the outputs in a txt file(checksum, error count, total count etc).
I have tried number of code combinations, but none have worked 100%, the closest I could get was to finish all the things, except stream written to file system had empty checksum value.
async processStream(streamable: Streamable): Promise<number> {
let checkSum = "";
const passthroughStream = new PassThrough();
const passthroughStreamcheckSum = new PassThrough();
let count = 0;
let countError = 0;
const md5Checksum = crypto.createHash("md5");
const lineStream: Interface = readline.createInterface({
input: passthroughStream
});
lineStream.on("line", line => {
if (line) {
try {
count++;
JSON.parse(line);
} catch (error) {
countError++;
}
}
});
passthroughStreamcheckSum.on("data", data => {
md5Checksum.update(data);
});
passthroughStreamcheckSum.on("end", () => {
checkSum = md5Checksum.digest("hex");
});
streamable.stream.pipe(passthroughStreamcheckSum);
streamable.stream.pipe(passthroughStream);
const createFilePromise = this.flushStream({
stream: passthroughStream,
key: 'KKK'
}, checkSum);
const processRecords = this.processEacchRecord(
lineStream,
'iii'
);
await once(passthroughStreamcheckSum, "end");
await Promise.all([createFilePromise, processRecords]);
await this.createTXTFile(checkSum, countError, count);
return count;
}
private async processRecords(
lineStream: readline.Interface,
keyForAvro: string
) {
await once(lineStream, "close");
await this.flushString('filename', [array. of error records]);
}
private async createTXTFile(
checksum: string,
errorcount: string,
count: number
) {
const data = {"c": checksum, "e":errorcount, "c":count };
await this.flushString('filename', data);
}
async flushStream(streamable: Streamable, checksum: string) {
//checksum is always empty ''
const stream = streamable.stream;
const fileOut = fs.createWriteStream(file);
stream.pipe(fileOut);
await once(stream, "end");
fileOut.end();
}