I am trying to make a multiprocessing MongoDB utility, it is perfectly working, but I think I have a performance issue... Even with 20 workers,it isn't processing more than 2800 docs per second... I think I can get 5x faster... This is my code, it isn't doing anything exceptional, just prints a remaining time to the end of the cursor.
Maybe there is a better way to perform multiprocessing on a MongoDB cursor, because I need to run some stuff on every doc with a 17.4M records collection, so performance and less time is a must.
START = time.time()
def remaining_time(a, b):
if START:
y = (time.time() - START)
z = ((a * y) / b) - y
d = time.strftime('%H:%M:%S', time.gmtime(z))
e = round(b / y)
progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)
def progress(p, c, t):
pc = (c * 100) / t
sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))
sys.stdout.flush()
def dowork(queue):
for p, i, pcount in iter(queue.get, 'STOP'):
remaining_time(pcount, i)
def populate_jobs(queue):
mongo_query = {}
products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
if products:
pcount = products.count()
i = 1
print "Procesando %s productos..." % pcount
for p in products:
try:
queue.put((p, i, pcount))
i += 1
except Exception, e:
utils.log(e)
continue
queue.put('STOP')
def main():
queue = multiprocessing.Queue()
procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]
for p in procs:
p.start()
populate_jobs(queue)
for p in procs:
p.join()
Also, I've noticed that about every 2500 aprox documents, script pauses for about .5 - 1 secs which is obviously a bad issue. This is a MongoDB problem becase if I do the exactly same loop but using a range(0, 1000000)
script doesn't pause at all and runs at 57,000 iterations per second, with a total of 20 seconds to end the script... Huge difference from 2,800 MongoDB documents per second...
This is the code to run a 1,000,000 iteration loop instead docs.
def populate_jobs(queue):
mongo_query = {}
products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
if products:
pcount = 1000000
i = 1
print "Procesando %s productos..." % pcount
for p in range(0, 1000000):
queue.put((p, i, pcount))
i += 1
queue.put('STOP')
UPDATE
As I saw, the problem is not the multiprocessing itself, is the cursor filling the Queue
which is not running in multiprocessing mode, it is one simple process that fills the Queue
(populateJobs
method) maybe if I could make the cursor multithread/multirpocess and fill the Queue
in parallel it will be filled up faster, then the multiprocessing method dowork
will do faster, because I think there's a bottleneck where I only fill about 2,800 items per second in Queue
and retrieving a lot more in dowork
multiprocess, but I don't know how can I parallelize MongoDB
cursor.
Maybe, the problem is the latency between my computer and the server's MongoDB. That latency, between me asking for next cursor and MongoDB telling me which is, reduces my performance by 2000% (from 61,000 str/s to 2,800 doc/s) NOPE I've tried on a localhost MongoDB and performance is exactly the same... This is driving me nuts