4

I'm making some API requests which are limited at 20 per second. As to get the answer the waiting time is about 0.5 secs I thought to use multiprocessing.Pool.map and using this decorator rate-limiting So my code looks like

def fun(vec):
        #do stuff

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(20)
def multi(vec):
    p = Pool(5)   
    return p.map(f, vec)

I have 4 cores and this program works fine and there is an improvement in time compared to the loop version. Furthermore, when the Pool argument is 4,5,6 it works and the time is smaller for Pool(6) but when I use 7+ I got errors (Too many connections per second I guess).

Then if my function is more complicated and can do 1-5 requests the decorator doesn't work as expected. What else I can use in this case?

UPDATE

For anyone looking for use Pool remembers to close it otherwise you are going to use all the RAM

def multi(vec):
    p = Pool(5)   
    res=p.map(f, vec)
    p.close()
    return res

UPDATE 2

I found that something like this WebRequestManager can do the trick. The problem is that doesn't work with multiprocessing. Pool with 19-20 processes because the time is stored in the class you need to call when you run the request.

Pete Ythong
  • 305
  • 5
  • 13
rpanai
  • 12,515
  • 2
  • 42
  • 64

1 Answers1

4

Your indents are inconsistent up above which makes it harder to answer this, but I'll take a stab.

It looks like you're rate limiting the wrong thing; if f is supposed be limited, you need to limit the calls to f, not the calls to multi. Doing this in something that's getting dispatched to the Pool won't work, because the forked workers would each be limiting independently (forked processes will have independent tracking of the time since last call).

The easiest way to do this would be to limit how quickly the iterator that the Pool pulls from produces results. For example:

import collections
import time
def rate_limited_iterator(iterable, limit_per_second):
    # Initially, we can run immediately limit times
    runats = collections.deque([time.monotonic()] * limit_per_second)
    for x in iterable:
        runat, now = runats.popleft(), time.monotonic()
        if now < runat:
            time.sleep(runat - now)
        runats.append(time.monotonic() + 1)
        yield x

def multi(vec):
    with Pool(5) as p:
        return list(p.imap(f, rate_limited_iterator(vec, 20)))
ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
  • Note: This will immediately dispatch the per second limit, rather than spacing evenly; if you need even spacing, you'd just adapt your existing code with a single stored time to the rate limiting iterator. – ShadowRanger Sep 21 '15 at 20:11
  • Thank you for the answer. You were right and my decorator was not working and even in your note above as increasing pool I reach the limit and get errors. So I should follow your suggestion and see. For same reasons using your example I use ~20% more time than with my code without decorator. – rpanai Sep 23 '15 at 00:57
  • Thanks for your nice answer. But actually it does not work for multiprocessing.map because it runs function after exhausts its iterable argument, transform the iterable argument to list. So you should use multiprocessing.imap fucntions for your rate limiter. – litao3rd Aug 30 '17 at 10:53
  • @litao3rd: Hmm... good point. That's aggravating. There's no need for `map` to work that way, especially if the `chunksize` is explicitly provided (removing the need to `list`ify unsized inputs to compute the default `chunksize`). I'll fix it to have the same behavior (return an ordered `list` of results) without otherwise changing the behavior. Code could use an update anyway (at this point, all supported versions of Python provide `time.monotonic` for instance, which is better than `time.time` for this usage). – ShadowRanger Apr 05 '23 at 15:28