0

I have create 3 read stream, that reads data form 3 tables in a database in JSON format. Now I need to complete three things with this stream and that too in sequence.

  1. Create md5 cheksum of the stream.
  2. save the stream in the filesystem in a JSON file with the checksum in the end.
  3. Read each line of the stream count it and see which records are corrupt, basically JSON.parse will do the job.
  4. save all the outputs in a txt file(checksum, error count, total count etc).

I have tried number of code combinations, but none have worked 100%, the closest I could get was to finish all the things, except stream written to file system had empty checksum value.

    async processStream(streamable: Streamable): Promise<number> {
            let checkSum = "";
            const passthroughStream = new PassThrough();
            const passthroughStreamcheckSum = new PassThrough();
            
            let count = 0;
            let countError = 0;
            const md5Checksum = crypto.createHash("md5");
            const lineStream: Interface = readline.createInterface({
              input: passthroughStream
            });
        
            lineStream.on("line", line => {
              if (line) {
                try {
                  count++;
                  JSON.parse(line);
                } catch (error) {
                  countError++;
                }
              }
            });
        
            passthroughStreamcheckSum.on("data", data => {
              md5Checksum.update(data);
            });
            passthroughStreamcheckSum.on("end", () => {
              checkSum = md5Checksum.digest("hex");
            });
        
            streamable.stream.pipe(passthroughStreamcheckSum);
            streamable.stream.pipe(passthroughStream);
    
            const createFilePromise = this.flushStream({
              stream: passthroughStream,
              key: 'KKK'
            }, checkSum);
        
            const processRecords = this.processEacchRecord(
              lineStream,
              'iii'
            );
            
            await once(passthroughStreamcheckSum, "end");
            await Promise.all([createFilePromise, processRecords]);
            await this.createTXTFile(checkSum, countError, count);
            return count;
          }
        
          private async processRecords(
            lineStream: readline.Interface,
            keyForAvro: string
          ) {
            await once(lineStream, "close");
            await this.flushString('filename',   [array. of error records]);
           
          }
        
          private async createTXTFile(
            checksum: string,
            errorcount: string,
            count: number
          ) {
            const data = {"c": checksum, "e":errorcount, "c":count };
            await this.flushString('filename', data);
          }

async flushStream(streamable: Streamable, checksum: string) {
    //checksum is always empty ''
    const stream = streamable.stream;
    const fileOut = fs.createWriteStream(file);
    stream.pipe(fileOut);
    await once(stream, "end");
    fileOut.end();
  }
user5740953
  • 169
  • 2
  • 16
  • Does this answer your question? [How to wait for a stream to finish piping? (Nodejs)](https://stackoverflow.com/questions/37837132/how-to-wait-for-a-stream-to-finish-piping-nodejs) – Smit Gajera Dec 02 '21 at 06:47
  • the code is all good as mention in the post, just that stream piping code 'stream.pipe(fileOut);' should be above the await promises, piping should not wait. That was the reason I got no data in my files after it was closed/finished. – user5740953 Dec 02 '21 at 23:47
  • @SmitGajera Actually no. I have read this post. In this we are using promise.all, which returns a single promise for all the promises in the iteration. I do not need this, I need use the value return by one promise to be used by second promise and hencefore, which is easy, but it is the streams and their piping which is hard to get through. – user5740953 Dec 09 '21 at 08:54

0 Answers0