2

I have a function that takes a list of IDs, converts them into an array URLS, then uses the map function to fire a fetch request. It works great but it fires too fast and the provider throws errors because we hit the API too often. I need to set an interval for the request but every time I do, it doesn't really work. Ideas?

async function getReports(reportIDs) {
    const urls = reportIDs.map(id => `https://api.data.com/api/v1/report/${id}/?include_datasets=true`);
    const requests = urls.map(url => fetch(url, {
        method: 'GET',
        headers: { 'api-key': key }
    }).then(res => res.json()));
    
    const responses = await Promise.all(requests).catch(err => console.error(err));
    return responses;
}

I use a promise so I can await the results of the function inside another function to transform the datasets.

Ideas?

S. Hesam
  • 5,266
  • 3
  • 37
  • 59
Wells
  • 55
  • 7

4 Answers4

6

“Simplicity is a great virtue but it requires hard work to achieve it and education to appreciate it. And to make matters worse: complexity sells better.”Edsger W. Dijkstra

The accepted "lightweight" solution is nearly 20,000 lines of code and depends on both CoffeeScript and Lua. What if you could trade all of that for just 50 lines of JavaScript?

Let's say we have some job that takes some amount of time to compute some result -

async function job(x) {
  // job consumes some time
  await sleep(rand(5000))
  // job computes a result
  return x * 10
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(job))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

This runs all twelve (12) jobs at once. If these were requests to a remote, some connections could be rejected because you are flooding the server with too much simultaneous traffic. By modeling a Pool of threads, we control the flow of the parallelized jobs -

// my pool with four threads
const pool = new Pool(4)

async function jobQueued(x) {
  // wait for pool thread
  const close = await pool.open()
  // run the job and close the thread upon completion
  return job(x).then(close)
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

Functions should be small and do just one thing. This makes it easier to write individual features and promotes a higher degree of reusability, allowing you to combine several simiple features into more sosphisticated ones. Above you already saw rand and sleep -

const rand = x =>
  Math.random() * x

const sleep = ms =>
  new Promise(r => setTimeout(r, ms))

If we want to throttle each job, we can specialize sleep to ensure a minimum runtime -

const throttle = (p, ms) =>
  Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)

async function jobQueued(x) {
  const close = await pool.open()
  // ensure job takes at least 3 seconds before freeing thread
  return throttle(job(x), 3000).then(close)
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

We can add some console.log messages to ensure things are running properly. And we will add a random sleep at the beginning of the job to show that the tasks can queue in any order without affecting the order of the result -

async function jobQueued(x) {
  await sleep(rand(5000))
  console.log("queueing", x)
  const close = await pool.open()
  console.log("  sending", x)
  const result = await throttle(job(x), 3000).then(close)
  console.log("    received", result)
  return result
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
console.log thread 1 thread 2 thread 3 thread 4
queueing 12
   sending 12 open
queueing 9
   sending 9 open
queueing 8
   sending 8 open
queueing 4
   sending 4 open
queueing 10
queueing 6
queueing 7
queueing 2
queueing 11
      received 120 closed
   sending 11 open
queueing 3
queueing 5
queueing 1
      received 80 closed
   sending 1 open
      received 90 closed
   sending 5 open
      received 110 closed
   sending 3 open
      received 40 closed
   sending 2 open
      received 10 closed
   sending 7 open
      received 50 closed
   sending 6 open
      received 20 closed
   sending 10 open
      received 30 closed
      received 70 closed
      received 60 closed
      received 100 closed
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

Above, our pool was initialized with size=4 so up to four jobs will run concurrently. After we see sending four times, a job must be completed and we see received before the next job begins. queueing can happen at any time. You may also notice Pool processes queued jobs using an efficient last-in-first-out (LIFO) order but the order of the result is maintained.

Moving on with our implementation, like our other functions, we can write thread in a simple way -

const effect = f => x =>
  (f(x), x)

const thread = close =>
  [new Promise(r => { close = effect(r) }), close]

function main () {
  const [t, close] = thread()
  console.log("please wait...")
  setTimeout(close, 3000)
  return t.then(_ => "some result")
}

main().then(console.log, console.error)
please wait...
(3 seconds later)
some result

And now we can use thread to write more sophisticated features like Pool -

class Pool {
  constructor (size = 4) {
    Object.assign(this, { pool: new Set, stack: [], size })
  }
  open () {
    return this.pool.size < this.size
      ? this.deferNow()
      : this.deferStacked()
  }
  deferNow () {
    const [t, close] = thread()
    const p = t
      .then(_ => this.pool.delete(p))
      .then(_ => this.stack.length && this.stack.pop().close())
    this.pool.add(p)
    return close
  }
  deferStacked () {
    const [t, close] = thread()
    this.stack.push({ close })
    return t.then(_ => this.deferNow())
  }
}

And just like that your program is complete. In the functioning demo below, I condensed the definitions so we can see them all at once. Run the program to verify the result in your own browser -

class Pool {
  constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
  open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
  deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
  deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))
const throttle = (p, ms) => Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)

const myJob = x => sleep(rand(5000)).then(_ => x * 10)
const pool = new Pool(4)

async function jobQueued(x) {
  await sleep(rand(5000))
  console.log("queueing", x)
  const close = await pool.open()
  console.log("  sending", x)
  const result = await throttle(myJob(x), 3000).then(close)
  console.log("    received", result)
  return result
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(JSON.stringify)
  .then(console.log, console.error)
.as-console-wrapper { min-height: 100%; }

Hopefully you learned something fun about JavaScript! If you enjoyed this, try expanding on Pool features. Maybe add a simple timeout function that ensures a job completes within a certain amount of time. Or maybe add a retry function that re-runs a job if it produces an error or times out. To see Pool applied to another problem, see this Q&A. If you have any questions, I'm happy to assist :D

Mulan
  • 129,518
  • 31
  • 228
  • 259
  • Your answer is fantastic! It is one of the best answers that I've seen. I apricate you for spending your time writing this complete answer. – S. Hesam Mar 15 '22 at 06:23
  • 1
    @S.Hesam I'm delighted to share. Thanks for your encouraging comment :D – Mulan Apr 04 '22 at 15:52
1

First modify your code so it calls a function called waitAndFetch(). You'll be passing in the URL plus the index value of that element in the array (0,1,2,3...):

async function getReports(reportIDs) {
    const urls = reportIDs.map(id => `https://api.data.com/api/v1/report/${id}/?include_datasets=true`);
    
    const requests = urls.map((url,i) => waitAndFetch);
    
    const responses = await Promise.all(requests).catch(err => console.error(err));
    return responses;
}

Next, create a function that simply returns a promise that resolves in 1000 x the index value milliseconds (0, 1sec, 2sec, 3sec...):

const wait = (i) => {
    const ms = i * 1000;
    return new Promise((resolve, reject) => {
        setTimeout(resolve, ms);
    });
};

Now write waitAndFetch. It will call wait(), but won't care about its return value. It only cares that it has to wait for it to resolve:

const waitAndFetch = async (url, i) => {
    await wait(i);
    const response = fetch(url, {
        method: 'GET',
        headers: { 'api-key': key }
    });
    return response.json();
};
code_monk
  • 9,451
  • 2
  • 42
  • 41
  • 1
    This makes each request `wait` one second before `fetch`ing but all `requests` still begin at the same time. If you want to wait one second before *each* request, you could use a simple `for` loop. Something like `const result = []; for (const url of urls) { await wait(); result.push(await fetch(url)) } return result` – Mulan May 20 '21 at 22:12
  • Thank you, @Thankyou. I coded a fix by fixing the code – code_monk May 20 '21 at 22:19
-1

I think the best thing to do is use a sleep function.

async function sleep() {
  return new Promise((resolve) => setTimeout(resolve, 300));
}

await sleep()
Seppe Mariën
  • 355
  • 1
  • 13
-1

If node, there is a fairly lightweight npm package called bottleneck which will throttle requests. It works quite well and is widely used. Or you can look at that and roll your own.

  • Worked great ` async function getReports(reportIDs) { const urls = reportIDs.map(id => 'https://api.data.com/api/v1/report/${id}/?include_datasets=true'); const requests = urls.map(url => limiter.schedule(()=>fetch(url, { method: 'GET', headers: { 'api-key': key } }).then(res => res.json()))); ` – Wells May 19 '21 at 00:05
  • Welcome to StackOverflow. [Unless a tag for a framework or library is also included, a pure JavaScript answer is expected](https://stackoverflow.com/tags/javascript/info). Posts that simply recommend a library can be written as a comment. If you are interested in how you might solve a problem like this on your own, please see the answer I provided. – Mulan May 20 '21 at 21:46
  • @Thank you for taking the time to provide a pure js solution for future readers. Your answer is excellent; lacking time to elucidate, I gave an immediate response which solved the problem (and pointed to an example) allowing the OP to move on to what was important. I agree, especially when not writing one-use utility code, that a few lines is preferable to using an entire library. I also agree that error handling, timeouts, and retry would be critical features for many applications. Sometimes it's important to teach, other times to point and move on; Thankfully, Stack facilitates both. – Daniel Westcott Jun 02 '21 at 17:53
  • Thanks Daniel. I agree with your points, I was only suggesting that your post was made as a comment :D – Mulan Jun 03 '21 at 00:06