17

I need to read whole collection from MongoDB ( collection name is "test" ) in Python code. I tried like

    self.__connection__ = Connection('localhost',27017)
    dbh = self.__connection__['test_db']            
    collection = dbh['test']

How to read through collection in chunks by 1000 ( to avoid memory overflow because collection can be very large ) ?

Damir
  • 54,277
  • 94
  • 246
  • 365
  • Sorry to drop an url at you, but I believe it is solved elegantly: http://code.activestate.com/recipes/137270-use-generators-for-fetching-large-db-record-sets/ – polve Mar 20 '12 at 12:34

6 Answers6

10

inspired by @Rafael Valero + fixing last chunk bug in his code and making it more general I created generator function to iterate through mongo collection with query and projection:

def iterate_by_chunks(collection, chunksize=1, start_from=0, query={}, projection={}):
   chunks = range(start_from, collection.find(query).count(), int(chunksize))
   num_chunks = len(chunks)
   for i in range(1,num_chunks+1):
      if i < num_chunks:
          yield collection.find(query, projection=projection)[chunks[i-1]:chunks[i]]
      else:
          yield collection.find(query, projection=projection)[chunks[i-1]:chunks.stop]

so for example you first create an iterator like this:

mess_chunk_iter = iterate_by_chunks(db_local.conversation_messages, 200, 0, query={}, projection=projection)

and then iterate it by chunks:

chunk_n=0
total_docs=0
for docs in mess_chunk_iter:
   chunk_n=chunk_n+1        
   chunk_len = 0
   for d in docs:
      chunk_len=chunk_len+1
      total_docs=total_docs+1
   print(f'chunk #: {chunk_n}, chunk_len: {chunk_len}')
print("total docs iterated: ", total_docs)

chunk #: 1, chunk_len: 400
chunk #: 2, chunk_len: 400
chunk #: 3, chunk_len: 400
chunk #: 4, chunk_len: 400
chunk #: 5, chunk_len: 400
chunk #: 6, chunk_len: 400
chunk #: 7, chunk_len: 281
total docs iterated:  2681
sashaostr
  • 625
  • 8
  • 16
9

I agree with Remon, but you mention batches of 1000, which his answer doesn't really cover. You can set a batch size on the cursor:

cursor.batch_size(1000);

You can also skip records, e.g.:

cursor.skip(4000);

Is this what you're looking for? This is effectively a pagination pattern. However, if you're just trying to avoid memory exhaustion then you don't really need to set batch size or skip.

Mick Sear
  • 1,549
  • 15
  • 25
5

Use cursors. Cursors have a "batchSize" variable that controls how many documents are actually sent to the client per batch after doing a query. You don't have to touch this setting though since the default is fine and the complexity if invoking "getmore" commands is hidden from you in most drivers. I'm not familiar with pymongo but it works like this :

cursor = db.col.find() // Get everything!

while(cursor.hasNext()) {
    /* This will use the documents already fetched and if it runs out of documents in it's local batch it will fetch another X of them from the server (where X is batchSize). */
    document = cursor.next();

    // Do your magic here
}
Remon van Vliet
  • 18,365
  • 3
  • 52
  • 57
3

Here is a generic solution to iterate over any iterator or generator by batch:

def _as_batch(cursor, batch_size=50):
    # iterate over something (pymongo cursor, generator, ...) by batch. 
    # Note: the last batch may contain less than batch_size elements.
    batch = []
    try:
        while True:
            for _ in range(batch_size):
                batch.append(next(cursor))
            yield batch
            batch = []
    except StopIteration as e:
        if len(batch):
            yield batch

This will work as long as the cursor defines a method __next__ (i.e. we can use next(cursor)). Thus, we can use it on raw cursor or also on transformed records.

Examples

Simple usage:

for batch in db['coll_name'].find():
    # do stuff

More complex usage (useful for bulk updates for example):

def update_func(doc):
    # dummy transform function
    doc['y'] = doc['x'] + 1
    return doc

query = (update_func(doc) for doc in db['coll_name'].find())
for batch in _as_batch(query):
    # do stuff

Reimplementation of the count() function:

sum(map(len, _as_batch( db['coll_name'].find() )))
Derlin
  • 9,572
  • 2
  • 32
  • 53
0

To the create the initial connection currently in Python 2 using Pymongo:

host = 'localhost'
port = 27017
db_name = 'test_db'
collection_name = 'test'

To connect using MongoClient

# Connect to MongoDB
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
dbh = client[dbname]
collection = dbh[collection_name]

So from here the proper answer. I want to read by using chunks (in this case of size 1000).

chunksize = 1000

For example we could decide the how many chunks of size (chunksize) we want.

# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
    skips_variable = [0,len(skips_variable)]

Then we can retrieve each chunk.

for i in range(1,len(skips_variable)):

    # Expand the cursor and retrieve data 

    data_from_chunk = dbh[collection_name].find(query)[skips_variable[i-1]:skips_variable[i]]))

Where query in this case is query = {}.

Here I use similar ideas to create dataframes from MongoDB. Here I use something similar to write to MongoDB in chunks.

I hope it helps.

Rafael Valero
  • 2,736
  • 18
  • 28
  • 1
    there is a bug in you code - you miss the last chunk. For example if you have 98 docs relevant to query you"ll get 10 chunks of 10 docs, but the loop will retrieve only first 90 while missing the last 8 – sashaostr Jan 20 '19 at 13:06
  • Having your chunks predefined like this would help and ensure you get it all, but also you are not re-using the cursor in your case which is not what I need. `chunk_iter = [[i,i + chunk_size] for i in range(0, cursor.count(), chunk_size)]` – radtek May 15 '20 at 01:17
0

Use itertool.islice() and a generator not to destroy MongoDB's laziness:

from itertools import islice

def batched(l, n):
    ll = iter(l)
    while (chunk := tuple(islice(ll, n)))
        yield chunk

chunk_size = int(5e5)
query = db_collection.find({}, trans_columns)
for chunk in batched(query, chunk_size):
    "Do something with the chunk..."

Note that in the upcoming Python-3.12 itertools.batched() was added.

ankostis
  • 8,579
  • 3
  • 47
  • 61