24

I have a couple of items that I need to query a 3rd party API for and said API has a call limit of 5 calls per second. I need to somehow throttle my calls to the API to a maximum of 5 calls per second.

So far I've just used Promise.all() on an array of promises, where each promise sends a request to the API and resolves when the API responds with the HTTP status code 200 and rejects when it responds with some other status code. However, when I have more than 5 items in the array, I risk that the Promise.all() rejects.

How can I limit the Promise.all() call to 5 calls per second?

Nazim Kerimbekov
  • 4,712
  • 8
  • 34
  • 58
Hallur A.
  • 299
  • 1
  • 3
  • 13
  • post some code too – quirimmo Dec 27 '18 at 16:50
  • 5
    Promise - is like shadow of request. Request that has already been sent. Keeping this in mind there is no way to throttle requests when you have already got promises. Try to check library you use for requesting - maybe it already has feature to throttle requests – skyboyer Dec 27 '18 at 16:57
  • 5
    You cannot do anything with `Promise.all`, as at that stage all the promise are already created and all tasks already running. `Promise.all` doesn't "call" anything, it waits for existing things. You need to throttle the API calls in the first place, in the loop where you are creating your promises. – Bergi Dec 27 '18 at 17:10
  • 1
    Have a look at https://stackoverflow.com/a/39197252/1048572 and https://stackoverflow.com/a/38778887/1048572 – Bergi Dec 27 '18 at 17:11
  • Does this answer your question? [Throttle amount of promises open at a given time](https://stackoverflow.com/questions/38385419/throttle-amount-of-promises-open-at-a-given-time) – Jeff Bowman Aug 23 '21 at 21:26

8 Answers8

11

Using ES6 without libraries

export async function asyncForEach(array, callback) {
  for (let index = 0; index < array.length; index++) {
    await callback(array[index], index, array);
  }
}
export function split(arr, n) {
  var res = [];
  while (arr.length) {
    res.push(arr.splice(0, n));
  }
  return res;
}
export const delayMS = (t = 200) => {
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(t);
    }, t);
  });
};
export const throttledPromises = (
  asyncFunction,
  items = [],
  batchSize = 1,
  delay = 0
) => {
  return new Promise(async (resolve, reject) => {
    const output = [];
    const batches= split(items, batchSize);
    await asyncForEach(batches, async (batch) => {
      const promises = batch.map(asyncFunction).map(p => p.catch(reject));
      const results = await Promise.all(promises);
      output.push(...results);
      await delayMS(delay);
    });
    resolve(output);
  });
};
Adel
  • 181
  • 2
  • 4
  • Of all the solutions I tried this one worked superbly. Thanks! – Zacho Apr 29 '20 at 22:05
  • Note this doesn't actually work as intended because it won't start the next batch of requests until the first batch is completely finished. I've put a typescript version of this as an answer below – JohnFlux Mar 01 '23 at 23:41
  • Also this reply doesn't pass the real index to the callback, just the index inside the batch. I've fixed in my answer. – JohnFlux Mar 02 '23 at 00:06
7

I hope this would help you.

And also to be said this would use Promise.all to resolve all requests and if you have a large list of queries, this would wait for all to resolve and may cause a lot waiting in your code to get all responses. And also if one of request rejects, Promise.all will reject.

I suggest if you don't need all results together it's better to use something else like lodash debounce or throttle or frameworks that handle this.

let items = [
    {name: 'item1'}, 
    {name: 'item2'}, 
    {name: 'item3'}, 
    {name: 'item4'}, 
    {name: 'item5'}, 
    {name: 'item6'}
];

// This is the api request that you send and return a promise
function apiCall(item) {
  return new Promise((resolve) => {
    setTimeout(() => resolve(item.name), 1000);
  })
}

new Promise((resolve) => {
  let results = [];

  function sendReq (itemsList, iterate, apiCall) {
    setTimeout(() => {
      // slice itemsList to send request according to the api limit
      let slicedArray = itemsList.slice(iterate * 5, (iterate * 5 + 5));
      result = slicedArray.map(item => apiCall(item));
      results = [...results, ...result];

      // This will resolve the promise when reaches to the last iteration
      if (iterate === Math.ceil(items.length / 5) - 1) {
          resolve(results);
      }
    }, (1000 * iterate)); // every 1000ms runs (api limit of one second)
  }

  // This will make iteration to split array (requests) to chunks of five items 
  for (i = 0; i < Math.ceil(items.length / 5); i++) {
    sendReq(items, i, apiCall);
  }
}).then(Promise.all.bind(Promise)).then(console.log);
// Use Promise.all to wait for all requests to resolve
// To use it this way binding is required
amirhaa
  • 280
  • 1
  • 8
5

You can use the concurrency option in bluebird if you're not too worried about resolving promises sequentially.

The below would process 5 queries at a time only.

const Promise = require('bluebird');

const buildQueries = (count) => {
  let queries = [];

  for(let i = 0; i < count; i++) {
    queries.push({user: i});
  };

  return queries;
};

const apiCall = (item) => {
  return new Promise(async (resolve, reject) => {
    await Promise.delay(1000);
    resolve(item.user);
  });
};

const queries = buildQueries(20);

Promise.map(queries, async query => {
  console.log( await apiCall(query) );
}, {concurrency: 5});
nullspace
  • 870
  • 10
  • 12
3

Maybe I'm simple-minded, but I wrote this version that just divides the incoming array into chunks of 5 promises each and does Promise.all() on each block:

utility.throttledPromiseAll = async (promises) => {
  const MAX_IN_PROCESS = 5;
  const results = new Array(promises.length);

  async function doBlock(startIndex) {
    // Shallow-copy a block of promises to work on
    const currBlock = promises.slice(startIndex, startIndex + MAX_IN_PROCESS);
    // Await the completion. If any fail, it will throw and that's good.
    const blockResults = await Promise.all(currBlock);
    // Assuming all succeeded, copy the results into the results array
    for (let ix = 0; ix < blockResults.length; ix++) {
      results[ix + startIndex] = blockResults[ix];
    }
  }

  for (let iBlock = 0; iBlock < promises.length; iBlock += MAX_IN_PROCESS) {
    await doBlock(iBlock);
  }
  return results;
};
Yves M.
  • 29,855
  • 23
  • 108
  • 144
Eric Hill
  • 661
  • 7
  • 11
  • 1
    Not simple minded! And there's a certain merit to simple solutions. Keep in mind tho that the script does not progress to the next block until the _whole_ block is finished. – wybe Jun 30 '21 at 08:25
2

I think you can break your problem into two: no more than 5 calls at the same time, and ensure that the newest call does not happen until after 1 second of the oldest.

The first part is easy to solve with an amazing p-limit library -- it has by far the simplest interface I have seen.

For the second part you would need to actually track when each call has started - i.e. implement a wait function: Basic pseudo-code, haven't tested:

import pLimit from 'p-limit';
const apiLimit = pLimit(5);

const startTimes = [];

async function rateLimiter(item) {
  const lastSecond = (new Date().getTime()) - 1000;
  if (startTimes.filter(v => v > lastSecond).length >= 5) {
    await new Promise(r => setTimeout(r, 1000));
  }
  // TODO: cleanup startTimes to avoid memory leak
  startTimes.push(new Date().getTime());
  return apiCall(item);
}

await Promise.all(items.map(v => apiLimit(() => rateLimiter(v))))
Yuri Astrakhan
  • 8,808
  • 6
  • 63
  • 97
  • 1
    To improve upon this answer, `p-queue` has interval and intervalCap built-in which allows you to do this with a much simpler API. Thanks for the recommendation, it allowed me to find this module. – neojp Jan 13 '21 at 00:44
  • Probably the p-throttle is closer to the issue although the p-limit and p-queue and p-throttle have the ESM issue to solve – hpfs Dec 15 '22 at 02:59
1

we can use a generator to send the list of promises in a group. Once the first yield is resolved we can do another yield. we will store the result in an array. Once the promiseArray length is equal to the result length we can resolve wrapped Promise.

const fetch = require("isomorphic-fetch");
const totalPromiseLength = 5;
const requestMethod = url => () => fetch(url).then(response => response.json());
let promiseArray = [...new Array(totalPromiseLength).keys()].map(index =>
  requestMethod("https://jsonplaceholder.typicode.com/todos/" + (index + 1))
);
function* chunks(arr, limit) {
  for (let i = 0; i < Math.ceil(arr.length / limit); ++i) {
    yield [...arr].slice(i * limit, i * limit + limit);
  }
}

new Promise(async resolve => {
  let generated = chunks(promiseArray, 2);
  let result = [];
  for (let bla of generated) {
    await Promise.all(bla.map(param => param())).then(response => {
      result = [...result, ...response];
      if (result.length === promiseArray.length) {
        resolve(result);
      }
    });
  }
}).then(response => {
  console.log(response);
});

1

Here's adel's answer, but with typescript types, and also fixing the index and array being passed to the callback:

async function asyncForEach<T>(array: T[], callback: (item: T, index: number, array: T[]) => Promise<void>) {
  for (let index = 0; index < array.length; index++) {
    await callback(array[index] as T, index, array);
  }
}
function split<T>(arr: T[], n: number): T[][] {
  var res = [];
  while (arr.length) {
    res.push(arr.splice(0, n));
  }
  return res;
}
const delayMS = (t = 200) => {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(t);
    }, t);
  });
};

/**
 * Say you want to call 'parse' on 5 values, but run a maximum of 2 at a time, with 100ms delay between each batch.   Call like:
 *
 * throttlePromises(async (values) => await parse(values), ['1','2','3','4','5'], 2, 100)
 */
export function throttledPromises<T, R>(
  asyncFunction: (item: T, index: number, array: T[]) => Promise<R>,
  items: T[],
  batchSize = 1,
  delay = 0
): Promise<(Awaited<R> | void)[]> {
  return new Promise(async (resolve, reject) => {
    const output: (Awaited<R> | void)[] = [];
    const batches = split(items, batchSize);
    await asyncForEach(batches, async (batch, batchNumber) => {
      const promises = batch
        .map((item, innerIndex) => asyncFunction(item, batchNumber * batchSize + innerIndex, items))
        .map((p) => p.catch(reject));
      const results = await Promise.all(promises);
      output.push(...results);
      if (delay) {
        await delayMS(delay);
      }
    });
    resolve(output);
  });
}

JohnFlux
  • 188
  • 1
  • 5
0

I recently refactored the vanilla ES6 solution I've been using for the couple of years into TypeScript, and it's been working great.

  • Promise objects are instantiated as needed
  • Results are yielded on state change
  • Lean on native PromiseSettledResult typings
  • Looks a bit like a Array.prototype.reduce() in usage ‍♂️

throttle.ts

export enum PromiseState {
  Pending = 'pending',
  Fulfilled = 'fulfilled',
  Rejected = 'rejected',
}

function getPromiseState( promise: Promise<any> ): Promise<PromiseState> {
  const control = Symbol();

  return Promise
    .race([ promise, control ])
    .then( value => ( value === control ) ? PromiseState.Pending : PromiseState.Fulfilled )
    .catch( () => PromiseState.Rejected );
}

export function isFulfilled<T>( promise: PromiseSettledResult<T> ): promise is PromiseFulfilledResult<T> {
  return promise.status === "fulfilled";
}

export function isRejected<T>( promise: PromiseSettledResult<T> ): promise is PromiseRejectedResult {
  return promise.status === "rejected";
}

export async function* throttle<InputType, OutputType>( reservoir: InputType[], promiseFn: ( args: InputType ) => Promise<OutputType>, concurrencyLimit: number ): AsyncGenerator<PromiseSettledResult<OutputType>[], void, PromiseSettledResult<OutputType>[] | undefined> {
  let iterable = reservoir.splice( 0, concurrencyLimit ).map( args => promiseFn( args ) );

  while ( iterable.length > 0 ) {
    await Promise.race( iterable );

    const pending: Promise<OutputType>[] = [];
    const resolved: Promise<OutputType>[] = [];

    for ( const currentValue of iterable ) {
      if ( await getPromiseState( currentValue ) === PromiseState.Pending ) {
        pending.push( currentValue );
      } else {
        resolved.push( currentValue );
      }
    }

    iterable = [
      ...pending,
      ...reservoir.splice( 0, concurrencyLimit - pending.length ).map( args => promiseFn( args ) )
    ];

    yield Promise.allSettled( resolved );
  }
}

Usage example:

app.ts

import { throttle, isFulfilled, isRejected } from './throttle';

async function timeout( delay: number ): Promise<string> {
  return new Promise( resolve => {
    setTimeout( () => resolve( `timeout promise with ${ delay } delay resolved` ), delay );
  } );
}

const inputArray: number[] = [ 1200, 1500, 1400, 1300, 1000, 1100, 1200, 1500, 1400, 1300, 1000, 1100 ];

( async () => {

  const timeoutPromises = await throttle<number, string>( inputArray, async item => {
    const result = await timeout( item );

    return `${ result } and ready for for..await..of`;
  }, 5 );

  const messages: string[] = [];

  for await ( const chunk of timeoutPromises ) {
    console.log( chunk.filter( isFulfilled ).map( ({ value }) => value ) );

    console.error( chunk.filter( isRejected ).map( ({ reason }) => reason ) );
  }

})();
Andrew Odri
  • 8,868
  • 5
  • 46
  • 55