63

I have a for loop array of promises, so I used Promise.all to go through them and called then afterwards.

let promises = [];
promises.push(promise1);
promises.push(promise2);
promises.push(promise3);

Promise.all(promises).then((responses) => {
  for (let i = 0; i < promises.length; i++) {
    if (promise.property === something) {
      //do something
    } else {
      let file = fs.createWriteStream('./hello.pdf');
      let stream = responses[i].pipe(file);
      /*
         I WANT THE PIPING AND THE FOLLOWING CODE 
         TO RUN BEFORE NEXT ITERATION OF FOR LOOP
      */
      stream.on('finish', () => {
        //extract the text out of the pdf
        extract(filePath, {splitPages: false}, (err, text) => {
        if (err) {
          console.log(err);
        } else {
          arrayOfDocuments[i].text_contents = text;
        }
      });
    });    
  }
}

promise1, promise2, and promise3 are some http requests, and if one of them is an application/pdf, then I write it to a stream and parse the text out of it. But this code runs the next iteration before parsing the test out of the pdf. Is there a way to make the code wait until the piping to the stream and extracting are finished before moving on to the next iteration?

ThePumpkinMaster
  • 2,181
  • 5
  • 22
  • 31

4 Answers4

44

Without async/await, it's quite nasty. With async/await, just do this:

Promise.all(promises).then(async (responses) => {
  for (...) {
    await new Promise(resolve => stream.on("finish", resolve));
    //extract the text out of the PDF
  }
})
Mike 'Pomax' Kamermans
  • 49,297
  • 16
  • 112
  • 153
Sarsaparilla
  • 6,300
  • 1
  • 32
  • 21
  • 18
    Note for others: the `finish` event only triggers if the caller handles the stream properly. If not (e.g. AWS SDK S3 uploads) then you can use the `close` event instead, to avoid the `await` sitting there forever. – Malvineous Oct 11 '18 at 08:47
  • can you help me understand the difference between `return new Promise()` and `await new Promise()`? in your code sample, i believe the former *would not* have the desired effect of pausing program execution until the finish event has fired for each iteration, whereas the latter would. why is that? (in my particular case, I am using `readable.pipe(writable)` within a loop and finding iteration does not pause unless I wrap the pipe in `await new Promise()`) – user1063287 Aug 15 '19 at 01:28
  • i'm not sure what you're asking. But calling "await" inside the loop will pause execution of the loop until the promise resolves, i.e. until the `pipe` finishes. So you would be piping the streams sequentially one after another. – Sarsaparilla Aug 16 '19 at 05:26
  • It's not good practice to not listen for streams' error events. – sean Jan 04 '20 at 13:21
  • Thanks very much ... I had some seemingly simple code that wasn't working due to streams not flushing. After hours of different attempts, I found this and it worked. Cheers! z – J.Z. Jul 01 '20 at 03:53
18

Something like the following would also work. I use this pattern fairly often:

let promises = [];
promises.push(promise1);
promises.push(promise2);
promises.push(promise3);

function doNext(){
  if(!promises.length) return;
  promises.shift().then((resolved) =>{
    if(resolved.property === something){
      ...
      doNext();
    }else{
      let file = fs.createWriteStream('./hello.pdf');
      let stream = resolved.pipe(file);
      stream.on('finish', () =>{
        ...
        doNext();
      });
    }

  })
}
doNext();

or break up the handler to a controller and Promisified handler:

function streamOrNot(obj){
  return new Promise(resolve, reject){
    if(obj.property === something){
      resolve();
      return;
    }
    let file = fs.createWriteStream...;
    stream.on('finish', () =>{
      ...
      resolve();
    });
  }
}

function doNext(){
  if(!promises.length) return;
  return promises.shift().then(streamOrNot).then(doNext);
}

doNext()
bknights
  • 14,408
  • 2
  • 18
  • 31
  • This is the best answer imo, using promises is overkill in this context imo, and constrictive (forcing synchronisation which isn't in the OP ) – Jamie Nicholl-Shelley May 07 '19 at 06:18
  • 2
    It's not good practice to not listen for streams' error events. – sean Jan 04 '20 at 13:21
  • 1
    indeed. It's an example of a technique not fully worked production code. In particular the implementor would need to decide whether to reject on errors or handle them and let all the other streams finish. – bknights Jan 04 '20 at 21:59
8

Use await with stream.pipeline() instead of stream.pipe():

import * as StreamPromises from "stream/promises";

...
await StreamPromises.pipeline(sourceStream, destinationStream);
Christian d'Heureuse
  • 5,090
  • 1
  • 32
  • 28
  • 2
    This is Node 16+ only sadly, those of us still requiring 14 must promisify the pipeline method instead. – Justin Grote Sep 07 '21 at 17:50
  • Finding issues with subpipes. For example, let's say I have Stream1, Stream2, StreamA, and StreamB where StreamB.on('finish', () => console.log('Done 1'); StreamAB = StreamA.pipe(StreamB); And I do await StreamPromises.pipeline([Stream1, Stream2, StreamAB]); console.log('Done 2'); This prints Done 2, then Done 1 instead of waiting for subpipes to complete. – Sam Araiza Aug 10 '22 at 16:00
4

You can write the else part inside a self invoked function. So that the handling of stream will happen in parallel

(function(i) {
    let file = fs.createWriteStream('./hello.pdf');
    let stream = responses[i].pipe(file);
  /*
     I WANT THE PIPING AND THE FOLLOWING CODE 
     TO RUN BEFORE NEXT ITERATION OF FOR LOOP
  */
    stream.on('finish', () => {
      //extract the text out of the pdf
      extract(filePath, {splitPages: false}, (err, text) => {
      if (err) {
        console.log(err);
      } 
      else {
        arrayOfDocuments[i].text_contents = text;
      }
    });
  });    
})(i) 

Else you can handle the streaming part as part of the original/individual promise itself.

As of now you are creating the promise and adding it to array, instead of that you add promise.then to the array(which is also a promise). And inside the handler to then you do your streaming stuff.

Oxi
  • 2,918
  • 17
  • 28
  • This doesn't finish the stream before the next iteration. I still get that promise3 gets called before promise2 finishes writing to the stream. – ThePumpkinMaster Jun 15 '16 at 14:40
  • It doesn't wait, but when the next iteration in for loop happens, it won't overwrite the stream from previous and everything works parallel – Oxi Jun 15 '16 at 14:46
  • What if I wanted to use that stream after the for loop was done? Like how do I use promises here to do that? – ThePumpkinMaster Jun 15 '16 at 15:01
  • 1
    thats the beauti, the closed functions(`closure`), it preserve the `stream` and any other internal variables even after the `for loop` is done. and for each iteration of for loop creates different streams. what that means is after the for loop is completed, for each stream you created, the finish event will be triggered – Oxi Jun 15 '16 at 15:49