I am currently writing a multithreaded client for making HTTP requests with a rate-limit, and for that I am using a 'manager' Process, that handles the ratelimit and enqueues new requests into the Pool.
For some reason, the queue doesn't seem to get synchronized between the class, the manager, and the worker processes.
Code
from multiprocessing import JoinableQueue, Pool, Value
from multiprocessing.dummy import Process
from time import time
from typing import Callable
import requests
from requests import Request, Response
class HTTPWorkerPool:
def __init__(self, requests: int, period: float, processes: int = None, daemon: bool = False):
self._pool = Pool(processes=processes)
self._queue = JoinableQueue()
self.rps = Value('f', 0.0)
self._running = Value('b', True)
self._manager = Process(name='fastclient-manager', target=self._manager_, args=(period,
requests, self._pool, self._queue, self.rps, self._running))
self._manager.daemon = True
self._manager.start()
def __del__(self):
self.join()
def _manager_(self, period, requests, pool, queue, rps, running):
limited = False
current_requests = 0
last_clear = 0.0
while running.value:
if queue.empty():
continue # keep waiting for input. If join wasn't called, this will still be used.
if current_requests >= requests:
limited = True
current_time = time()
if last_clear + period <= current_time:
rps.value = current_requests/(current_time-last_clear)
last_clear = current_time
limited = False
current_requests = 0
if not limited:
print(f'in queue {queue.qsize()}')
pool.apply_async(self._worker, queue)
current_requests += 1
def _worker(self, queue: JoinableQueue):
(req, cb) = queue.get()
cb(requests.send(req.prepare()))
queue.task_done()
def join(self):
self._queue.close()
self._queue.join()
self._running.value = False
self._manager.join()
self._pool.close()
self._pool.terminate()
self._pool.join()
def submit(self, request: Request, callback: Callable[[Response], None]):
self._queue.put((request, callback))
and for testing
from time import sleep
from fastclient import HTTPWorkerPool
from requests import Request
def cb(res):
print(res.text)
if __name__ == '__main__':
pool = HTTPWorkerPool(10, 1)
for _ in range(100):
pool.submit(Request(method='GET', url='https://httpbin.org/get'), cb)
for _ in range(10):
sleep(1)
print(pool.rps.value)
The output is a bunch of 100
s (the queue length) and every second 9.9...
(the requests-per-second).
The queue length keeps staying at 100
and doesn't decrease.
Does someone know, how I can properly synchronise the queue(s), in order to have the tasks completed?