0

getting a bit mixed up in trying to stream a csv, make http requests for each row, and have everything execute and log to console in the "proper" order. Ultimately, I think I'm not wrapping my promises right, or...?

const getUserByEmail = async (email) => {
  const encodedEmail = encodeURIComponent(email);

  try {
    const response = await http.get(`users?email=${encodedEmail}`);
    const userId = response.data.data[0] && response.data.data[0].id;

    return (userId ? userId : `${email} not found`);
  } catch (error) {
    console.error('get user error: ', error);
  }
};

const run = async () => {
  console.log('==> Reading csv ...');

  const promises = [];
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
    .on('error', (error) => console.error('stream error: ', error))
    .on('data', (row) => {
      promises.push(getUserByEmail(row.email));
    })
    .on('end', rowCount => {
      console.log(`==> Parsed ${rowCount} rows from csv ...`);
    })

  await Promise.all(promises)
    .then(values => console.log(values))

  console.log('==> End of script')
};

run();

I'm attempting / expecting the code above to take each row of the csv, push each http call (a promise) to an array of promises, and have everything execute/log to console in the order I'm expecting.

This is my actual output:

==> Reading csv...
[]
==> End of script
==> Parsed 10 rows from csv ...

And this is what I'm expecting:

==> Reading csv...
==> Parsed 10 rows from csv ...
[
  QyDPkn3WZp,
  e75KzrqYxK,
  iqDXoEFMZy,
  PstouMRz3y,
  w188hLyeT6,
  g18oxMOy6l,
  8wjVJutFnh,
  fakeEmail@fakeDomain.com not found,
  QEHaG3cp7d,
  y8I4oX6aCe
]
==> End of script

The biggest issue for me is that anything is logging after "==> End of script", which indicates to me that I don't have a strong grasp of when/why all previous events are logging in the order that they are.

Ultimately—and I haven't gotten there yet—I'd like to also buffer/time these requests to 100 per minute otherwise I will be rate-limited by this particular API.

Thank you!

burgerpe24
  • 25
  • 6
  • By "proper" order do you mean that you want each http request to be made in the order they appear in the table? If so, `Promise.all(` executes all the promises simultaneously, so the order in which they finish is basically random. You could loop through each line and `await` for the request to finish. – QuinnFreedman Mar 07 '21 at 22:38
  • Thanks for taking a look @QuinnFreedman—much appreciated. I could have been more clear on that part. The order in the table doesn't matter much—by "proper" order my primary concern was that the stream was still logging after my "end of script" console log. – burgerpe24 Mar 08 '21 at 13:53

1 Answers1

3

The hole readStream all the way down to await Promise.all(promises) is synchronous - the data event is asynchronous and populates promises in another event loop therefore promises is an empty array when you call Promise.all - you are not waiting for the stream to end. You might want to put your logic in the end event instead like this

const run = async () => {
  console.log('==> Reading csv ...');

  const promises = [];
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
    .on('error', (error) => console.error('stream error: ', error))
    .on('data', (row) => {
      promises.push(getUserByEmail(row.email));
    })
    .on('end', async rowCount => {
      await Promise.all(promises)
        .then(values => console.log(values))

      console.log('==> End of script')
    })
}

Another easier way to go about it is to use async iterator the readStream has a symbol.asyncIterator that you can use

const run = async () => {
  console.log('==> Reading csv ...');

  let rowCount = 0
  const promises = []
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
  
  for await (let row of readStream) {
    rowCount++
    promises.push(getUserByEmail(row.email));
  }
    
  console.log(`==> Parsed ${rowCount} rows from csv ...`)

  await Promise.all(promises).then(console.log)

  console.log('==> End of script')
}

I would have gone further to limit the concurrency and do:

const run = async () => {
  console.log('==> Reading csv ...');

  const result = []
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
  
  for await (let row of readStream) {
    result.push(await getUserByEmail(row.email))
  }

  console.log(result)
  console.log('==> End of script')
}

if you want to increase the concurency of a async iterator, then look at this post but beware. the result could be out of order when using this method

Endless
  • 34,080
  • 13
  • 108
  • 131