0

I have a series of callbacks and events, and need to follow a synchronous flow, but cannot figure out how to organize this to accomplish as such.

A simplified version of the problem and code is as follows

I have a list of zips on a server that I download to local filesystem

const zipFiles = // very very long list of downloadedFiles
for (let i = 0; i < zipFiles.length; i++) {
  await processZipForExtraction(zipFiles[i]);
}

I want this await to really behave and wait for all below async processing to finish

private async processZipForExtraction (file: Image): Promise<any> {
  return new Promise(async (resolve, reject) => {
    let writeStream = fs.createWriteStream(file.name);
    readStream.pipe(writeStream);

    writeStream.on('finish', () => {

      // open zips
      var zip = new StreamZip({
        file: './' + file.name,
        storeEntries: true
      });

      zip.on('ready', () => {
        console.log('All entries read');
        // this only means the below event got fired for each file, and I get a count
      });

      zip.on('entry', (entry) => {
        if (entry.name.endsWith('') {

          zip.stream(entry.name, (err, stream) => {
            if (err) reject(err);

            let uploadStream = this.uploadFromStream(name);
            uploadStream.on('success', async () => {
              // this also means this entry in the zip is "done"
            });

            stream.pipe(uploadStream)
          });
        }
        else {
          // this also means this entry in the zip is "done"
        }
      });
    }
  });
}

Now the difficult here is that I want to wait until each entry in the zip is "done". Sometimes there's an async stream event (if it succeeded in the zip), and sometimes there isn't. How can I re-organize the logic so wait for

  1. Download each zip
  2. View each zip entry
  3. If it matches, process and upload stream
  4. Once all entry are both processed AND successfully uploaded 4a. delete the zip 4b. resolve the original processZipForExtraction (so that only one zip is being processed at a time)

I have several thousand and they're multi-gig so I want to only download one, process, delete, then start on the next one...

I'm sure I can re-organize the logic to be able to await everything but I can't figure out where or how, since the zip.on 'entry' event then itself has another stream event watcher inside in which I need to wait for, and I don't know how many files are in the zip until the 'ready' event fires

Thanks in advance!

Joshua Ohana
  • 5,613
  • 12
  • 56
  • 112
  • Hi Joshua Ohana, I found this https://stackoverflow.com/questions/24586110/resolve-promises-one-after-another-i-e-in-sequence. – ian00 Nov 30 '20 at 04:06
  • My first take on this is that you will have to keep track of each `uploadStream` that you open and when it closes. Once the zip stream's `ready` event fires (so you know that there are not. more upload streams yet to open), you can resolve your promise once the last `uploadStream` closes. A bit of a jugging act and I feel like there is a better abstraction to be found (rather than counting things and updating state in event handlers), but it eludes me at the moment... if you confirm I am thinking in the right track, I can post some code to riff on. – Henry Mueller Nov 30 '20 at 04:12
  • One possible simplification, can you use the "zip.entries() - get all entries description" method to get all the entries in a list, rather than listen on the `entry` and `ready` events, To me, this is easier to reason about, but it is the same end result. – Henry Mueller Nov 30 '20 at 04:20

1 Answers1

1

Following up on my comments, here is what I come up with using two approaches:

  1. Use the StreamZip entries method to get the entries into a list rather than listening to the entry event.
  2. Wrap each upload stream async activity in its own promise, then use Promise.all to await all uploads completing. (You could also keep track of the uploads manually in your own state, up to you.)

This code is obviously not tested and probably has some typos, hopefully it gives you some ideas, though.

private processZipForExtraction(file: Image): Promise<string> {
    return new Promise((resolve, reject) => {
        let writeStream = fs.createWriteStream(file.name);
        readStream.pipe(writeStream);

        writeStream.on('finish', () => {

            // open zips
            var zip = new StreamZip({
                file: './' + file.name,
                storeEntries: true
            });

            // To make keeping track of the status of asyc request easier, 
            // wrapping each upload in its own promise.  This is using the
            // existing abstraction of Promises in lieu of adding some sort
            // of state (probably a counter) to this class.  Either works,
            // choice is yours.
            const uploadEntry = (name) =>
                new Promise((resolveUpload, rejectUpload) => {
                    zip.stream(name, (err, stream) => {
                        if (err) reject(err); // note that this will reject the entire zip file processing, not just this upload

                        let uploadStream = this.uploadFromStream(name);
                        uploadStream.on('success', resolveUpload);
                        // uploadStream must have a fail event for this to work, but since `success` 
                        // is a non standard stream event, not wanting to assume name of failure event
                        uploadStream.on('THENAMEOFYOURFAILEVENT', rejectUpload);

                        stream.pipe(uploadStream)
                    });
                })

            zip.on('ready', async () => {
                console.log('All entries read');

                // This line based on reading the tests of `node-stream-zip`.
                // Apparently `.entries` returns an object with the prop keys
                // are the file names. Not (at all) tested. 
                const entryNameList = Object.keys(zip.entries());

                // Filter the entries you want to upload, then pass the entry
                // names to the `uploadEntry` to get the Promises wrapping each
                // async streaming operation.
                const uploadPromiseList = entryNameList
                    .filter(name => name.endsWith('')) // '' from original code, obv. you plan on putting something else here
                    .map(uploadEntry);

                // Wait for all the uploads to resolves.  Assuming you will
                // reject if any upload fails, but you could also 
                // Promise.allSettled if failed uploads don't matter.
                try {    
                    await Promise.all(uploadPromiseList);
                    resolve('success');
                } catch (err) {
                    reject(err);
                }
            });

        });
    });
}

You can add the code to delete the file after the Promise.all or outside this method after it resolves.

Henry Mueller
  • 1,309
  • 9
  • 8