4

I am trying to make a multiprocessing MongoDB utility, it is perfectly working, but I think I have a performance issue... Even with 20 workers,it isn't processing more than 2800 docs per second... I think I can get 5x faster... This is my code, it isn't doing anything exceptional, just prints a remaining time to the end of the cursor.

Maybe there is a better way to perform multiprocessing on a MongoDB cursor, because I need to run some stuff on every doc with a 17.4M records collection, so performance and less time is a must.

START = time.time()
def remaining_time(a, b):
    if START:
        y = (time.time() - START)
        z = ((a * y) / b) - y
        d = time.strftime('%H:%M:%S', time.gmtime(z))
        e = round(b / y)
        progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)


def progress(p, c, t):
    pc = (c * 100) / t
    sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))
    sys.stdout.flush()

def dowork(queue):
    for p, i, pcount in iter(queue.get, 'STOP'):
        remaining_time(pcount, i)


def populate_jobs(queue):
    mongo_query = {}
    products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
    if products:
        pcount = products.count()
        i = 1
        print "Procesando %s productos..." % pcount
        for p in products:
            try:
                queue.put((p, i, pcount))
                i += 1
            except Exception, e:
                utils.log(e)
                continue
    queue.put('STOP')


def main():
    queue = multiprocessing.Queue()

    procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]

    for p in procs:
        p.start()

    populate_jobs(queue)

    for p in procs:
        p.join()

Also, I've noticed that about every 2500 aprox documents, script pauses for about .5 - 1 secs which is obviously a bad issue. This is a MongoDB problem becase if I do the exactly same loop but using a range(0, 1000000) script doesn't pause at all and runs at 57,000 iterations per second, with a total of 20 seconds to end the script... Huge difference from 2,800 MongoDB documents per second...

This is the code to run a 1,000,000 iteration loop instead docs.

def populate_jobs(queue):
    mongo_query = {}
    products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
    if products:
        pcount = 1000000
        i = 1
        print "Procesando %s productos..." % pcount
        for p in range(0, 1000000):
            queue.put((p, i, pcount))
            i += 1
    queue.put('STOP')

UPDATE As I saw, the problem is not the multiprocessing itself, is the cursor filling the Queue which is not running in multiprocessing mode, it is one simple process that fills the Queue (populateJobs method) maybe if I could make the cursor multithread/multirpocess and fill the Queue in parallel it will be filled up faster, then the multiprocessing method dowork will do faster, because I think there's a bottleneck where I only fill about 2,800 items per second in Queue and retrieving a lot more in dowork multiprocess, but I don't know how can I parallelize MongoDB cursor.

Maybe, the problem is the latency between my computer and the server's MongoDB. That latency, between me asking for next cursor and MongoDB telling me which is, reduces my performance by 2000% (from 61,000 str/s to 2,800 doc/s) NOPE I've tried on a localhost MongoDB and performance is exactly the same... This is driving me nuts

Robert W. Hunter
  • 2,895
  • 7
  • 35
  • 73
  • " I think I can get 5x faster." Why do you think so? –  Apr 20 '15 at 10:57
  • 1
    Because I can perform 8000 insertions per seconds to MongoDB, which requires more effort than a simple find and it's doing 2800 docs per seconds, so I think I can get more perfomrnance on find. – Robert W. Hunter Apr 20 '15 at 11:04
  • How many jobs are you sending through the `Queue`? If it's a large number, you may get improved performance if you use a `multiprocessing.Pool`, which can batch up the jobs before sending them to the child processes, assuming you use `Pool.map` to send the work to the children. You could also use `Pool.imap` with a fairly large `chunksize`, if you want to avoid having the entire list of tasks in memory. – dano Apr 20 '15 at 13:14
  • The number is variable but more than 1,000,000 for sure, see my last update because the speed problem is in a certain point – Robert W. Hunter Apr 20 '15 at 13:28
  • @dano I just used a `multiprocesing.Pool` once with `async`, could yo post an example taking by reference the code in my question? I'm a bit lost on how feed childs this way.. – Robert W. Hunter Apr 20 '15 at 16:50
  • @RobertW.Hunter I've added an answer, but I'm not sure it's going to make any difference based on your update to the question. – dano Apr 20 '15 at 17:25

2 Answers2

5

Here's how you can use a Pool to feed the children:

START = time.time()
def remaining_time(a, b):
    if START:
        y = (time.time() - START)
        z = ((a * y) / b) - y
        d = time.strftime('%H:%M:%S', time.gmtime(z))
        e = round(b / y)
        progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)


def progress(p, c, t):
    pc = (c * 100) / t
    sys.stdout.write("%s [%-20s] %d%%\r" % (p, '█' * (pc / 5), pc))
    sys.stdout.flush()

def dowork(args):
    p, i, pcount  = args
    remaining_time(pcount, i)

def main():
    queue = multiprocessing.Queue()

    procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]
    pool = multiprocessing.Pool(CONFIG_POOL_SIZE)
    mongo_query = {}
    products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
    pcount = products.count()
    pool.map(dowork, ((p, idx, pcount) for idx,p in enumerate(products)))
    pool.close()
    pool.join()

Note that using pool.map requires loading everything from the cursor into memory at once, though, which might be a problem because of how large it is. You can use imap to avoid consuming the whole thing at once, but you'll need to specify a chunksize to minimize IPC overhead:

# Calculate chunksize using same algorithm used internally by pool.map
chunksize, extra = divmod(pcount, CONFIG_POOL_SIZE * 4)
if extra:
   chunksize += 1

pool.imap(dowork, ((p, idx, pcount) for idx,p in enumerate(products)), chunksize=chunksize)
pool.close()
pool.join()

For 1,000,000 items, that gives a chunksize of 12,500. You can try sizes larger and smaller than that, and see how it affects performance.

I'm not sure this will help much though, if the bottleneck is actually just pulling the data out of MongoDB.

dano
  • 91,354
  • 19
  • 222
  • 219
  • With your method I get a speed of 60,000 per second, absolutely great actually, I will do more tests to se if that number is correct or not, but one question, using your method, how can I for example, inside `dowork` update an array and then when `i % n ==0` pull the array (actually I'm going to do a bulk insert on MongoDB) because, if I'm not wrong using a `multiprocessing.Pool` there aren't n processes summoned, so `dowork` method is not iterating over anything so I can push/pull to an external array (external to the iteration itself) I hope I've explained it correctly – Robert W. Hunter Apr 21 '15 at 09:07
  • And also, this way, process bar info is not showing ordered results, it can say that `392093/1000000 = 39%` and then it says `3/1000000 = 0.003%` and then `984333/10...` and so on.. Unlike `multiprocessing.Process` which printed the exact percentage for each invoked process... – Robert W. Hunter Apr 21 '15 at 09:40
  • @RobertW.Hunter For your second question, yes, that's the downside of using chunking. It makes performance way better, but your progress meter is going to get screwed up because you're less likely to be process items in the order your originally sent them into the Pool. There are ways to fix it (you could use a shared counter variable), but it will hurt your performance to synchronize access to it. Another idea would be to handle the progress meter stuff in the parent process, so that progress is updated as each task completes. This would avoid slowing down the workers. – dano Apr 21 '15 at 13:21
  • @RobertW.Hunter For your first question - Are you saying you want to save some data on each run of `dowork`, and then after enough data has been saved, run a DB operation using it? There are a few ways to do it; one would be to use a `multiprocessing.Manager` to create a list shared between all the workers. This will probably hurt performance quite a bit. The other would be to use the `initializer` argument of the `Pool` constructor to create a global counter/list in each worker process. Then each worker could keep track of its own list of items...(continued) – dano Apr 21 '15 at 13:24
  • Well thank you @dano for your answers I will make it as the correct one because that was my original question. I will try your comments and see if I can handle it, and yes, I want to save some data on each run and then do a bulk insert into MongoDB because the speed difference compared to insert one by one is incredibly huge as there are less I/O operations on the server. – Robert W. Hunter Apr 21 '15 at 13:36
  • ...and run its own DB update when the count gets high enough. The tricky part would be making sure the last items were written to the DB prior to the pool exiting. You would probably need to add an extra `map` call to a function that would do one last bulk insert for each process, and use some synchronization mechanism to make sure each process in the `Pool` does the update. – dano Apr 21 '15 at 13:37
  • @RobertW.Hunter Another idea would be to use your original `Queue` technique, but send chunks of the list through the `Queue` each time, just like the `Pool` does. Then you could keep your original approach more intact, and just add logic to iterate over each entry in the chunk sent via the `Queue`. – dano Apr 21 '15 at 13:39
  • @RobertW.Hunter Actually, see [this answer](http://stackoverflow.com/a/24724452/2073595) for a way to run a function in each worker process prior to it exiting. – dano Apr 21 '15 at 13:46
1

Why are you using multiprocessing? You don't seem to be doing actual work in other threads using the queue. Python has a global interpreter lock which makes multithreaded code less performant than you'd expect. It's probably making this program slower, not faster.

A couple performance tips:

  1. Try setting batch_size in your find() call to some big number (e.g. 20000). This is the maximum number of documents returned at a time, before the client fetches more, and the default is 101.

  2. Try setting cursor_type to pymongo.cursor.CursorType.EXHAUST, which might reduce the latency you're seeing.

Community
  • 1
  • 1
paulmelnikow
  • 16,895
  • 8
  • 63
  • 114
  • As I said, I'm doing work on each process, but just to test the speed of iterations I'm just printing some statistics, but the real code does real work. I will try your tips. Just for update, I'm not doing multi threading but Multi Processing, because I need to kill process at some point if I want – Robert W. Hunter Apr 20 '15 at 11:46
  • @noa the GIL doesn't apply to [multiprocessing](https://docs.python.org/2/library/multiprocessing.html) - if I got your statement right, that's what you're implying – miraculixx Apr 20 '15 at 11:53
  • After trying your tips, performance is the same. I can't understand why a simple range loop is ~61000 items/second but cursor is ~2800 items/second... – Robert W. Hunter Apr 20 '15 at 14:31
  • Without iterating over the result you're only loading the first batch of records from the database. Obviously it will take longer to actually load all the data. That said, it's probably worth removing the multiprocessing stuff to get a clearer benchmark and a simpler test case. – paulmelnikow Apr 20 '15 at 14:40