3

I have (900k, 300) records on mongo collection. When i am trying to read the data to pandas the memory consumption increase dramatically till the process is Killed. I have to mention that the data is fit to memory(1.5GB~) if i am reading it from csv file.

My machine is 32GB RAM and 16 CPU's Centos 7.

My simple code:

client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()
df = pd.DataFrame(list(cursor))

My multiprocessing code:

def read_mongo_parallel(skipses):


    print("Starting process")
    client = MongoClient(skipses[4],skipses[5])
    db = client[skipses[2]]
    collection = db[skipses[3]]
    print("range of {} to {}".format(skipses[0],skipses[0]+skipses[1]))

    cursor = collection.find().skip(skipses[0]).limit(skipses[1])

    return list(cursor)

all_lists = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for  rows in  executor.map(read_mongo_parallel, skipesess):
            all_lists.extend(rows)


df = pd.DataFrame(all_lists)   

The memory increase in both methods and kill the kernel,

What i am doing worng?

villoro
  • 1,469
  • 1
  • 11
  • 14
MPA
  • 1,011
  • 7
  • 22
  • All of the recommendations to read data in chunks are good but don't change the fact that pandas is memory hungry, and requiring that one huge dataframe in memory may just always be problematic with your data shape/size. Can you operate on parts of the data instead? Can you limit the width of the data you are reading in? Can you ditch pandas altogether? – totalhack Feb 10 '20 at 21:14
  • Acutually the list of dicionaries that generated by the list(cursor) consume most of the memory, once the df created the memory of the df decrease to 2GB , while the list of dictionaries takes more than 32GB and i only have 32GB. Now i am try to invesigate why the dicionary so memory hungry – MPA Feb 10 '20 at 21:39
  • You said it fits into memory if read from a CSV. Did you mean read from a CSV into a DataFrame it was only 1.5GB, or read from a CSV into rows of dicts? – totalhack Feb 10 '20 at 21:45
  • Yes, but i mean that if i read in chunks from mongo to df its only a1.5gb ,but if i read from mongo in chunks and keeps the list of dicts the memory exceeds the limit – MPA Feb 10 '20 at 22:03

6 Answers6

13

The problem is in the list usage when you build the DataFrame. The cursor is consumed all at once, making a list with 900k dictionaries inside it, which takes a lot of memory.

You can avoid that if you create an empty DataFrame and then pull the documents in batches, a few documents at a time, appending them to the DataFrame.

def batched(cursor, batch_size):
    batch = []
    for doc in cursor:
        batch.append(doc)
        if batch and not len(batch) % batch_size:
            yield batch
            batch = []

    if batch:   # last documents
        yield batch

df = pd.DataFrame()
for batch in batched(cursor, 10000):
    df = df.append(batch, ignore_index=True)

10000 seems like a reasonable batch size, but you may want to change it according to your memory constraints: the higher it is, the faster this will end, but also the more memory it will use while running.

UPDATE: Add some benchmark

Note that this approach does not necessary make the query last longer but rather the opposite, as what actually takes time is the process of pulling the documents out of mongodb as dictionaries and allocating them into a list.

Here are some benchmarks with a 300K documents that show how this approach, with the right batch_size is actually even faster than pulling the whole cursor into a list:

  • The whole cursor into a list
%%time

df = pd.DataFrame(list(db.test.find().limit(300000)))

CPU times: user 35.3 s, sys: 2.14 s, total: 37.5 s Wall time: 37.7 s

  • batch_size=10000 <- FASTEST
%%time

df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 10000):
    df = df.append(batch, ignore_index=True)

CPU times: user 29.5 s, sys: 1.23 s, total: 30.7 s Wall time: 30.8 s

  • batch_size=1000
%%time

df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 1000):
    df = df.append(batch, ignore_index=True)

CPU times: user 44.8 s, sys: 2.09 s, total: 46.9 s Wall time: 46.9 s

  • batch_size=100000
%%time

df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 100000):
    df = df.append(batch, ignore_index=True)

CPU times: user 34.6 s, sys: 1.15 s, total: 35.8 s Wall time: 36 s

Carles Sala
  • 1,989
  • 1
  • 16
  • 34
  • I do not think there is any way to make this faster, @MPA, as what actually takes long is pulling the documents from MongoDB into a list. I updated the answer adding some benchmarks, where you can see that using the right `batch_size` makes this process even faster than not using batching at all. – Carles Sala Feb 09 '20 at 17:34
  • now on matrix size of 700K,300 it's take 8 minutes, with multiprocessing it takes 2 minutes, by on 800k it exceed the memory limit – MPA Feb 10 '20 at 10:21
  • Acutually your solution took 416.47 seconds, you have the bounty by mistake :), Enjoy! – MPA Feb 11 '20 at 12:43
  • Well, I'm glad I got it, though I'd prefer it not being by mistake :-/ – Carles Sala Feb 11 '20 at 16:23
  • One thing that you can do is maybe combining both my solution and the multiprocessing one. You can put my proposal inside a function and then spread it across multiple processes. And at the end just concatenate the returned DataFrames. Though, bear in mind that then you have to account for the memory being used concurrently on each process, so you might need to lower the batch_size. – Carles Sala Feb 11 '20 at 16:26
1

This test harness creates 900k (albeit small) records and runs fine on my stock laptop. Give it a try.

import pymongo
import pandas as pd

db = pymongo.MongoClient()['mydatabase']
db.mycollection.drop()
operations = []

for i in range(900000):
    operations.append(pymongo.InsertOne({'a': i}))

db.mycollection.bulk_write(operations, ordered=False)
cursor = db.mycollection.find({})
df = pd.DataFrame(list(cursor))
print(df.count())
Belly Buster
  • 8,224
  • 2
  • 7
  • 20
  • Your code doesnt write the documents for me, and my dataset is 300 bigger than the dummy data, i have 300 columns – MPA Feb 02 '20 at 11:03
  • Ok I've expanded the dataset out to 300 fields in the collection and can see where your problem is. – Belly Buster Feb 02 '20 at 15:31
  • where can you see the problem? – MPA Feb 02 '20 at 16:18
  • The problem is, unsurprisingly, the very last line where you convert the list to the dataframe. Without your exact data is difficult to determine exactly, but a 300 field collection will take up about 40kb per row in the dataframe. Multiply that through by your 900k records and there's your 32GB gone in one roll of the dice. Use `sys.getsizeof(df)` to see the size of the df. – Belly Buster Feb 02 '20 at 16:28
  • if you create dummy df as df=pd.DataFrame(np.random.random_sample((900000, 300))) df['article_id'] = '1.7637874'. df.info() will be 2GB+ only – MPA Feb 02 '20 at 20:02
  • I suspect, but I don't know, that numpy series are a lot more memory-optimised than generic mongodb fields. – Belly Buster Feb 02 '20 at 23:22
1

Load the data in chunks.

Using iterator2dataframes from https://stackoverflow.com/a/39446008/12015722

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)

client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()

df = iterator2dataframes(cursor, 1000)
ckedar
  • 1,859
  • 4
  • 7
1

Just wanted to make y'all aware of pymongoarrow which is officially developed by MongoDB and solves this problem. It can output query results to arrow tables or pandas data frames and is - according to the docs - the preferred way of loading data from mongo into pandas. It sure worked like a charm for me!

Moritz Wilksch
  • 141
  • 2
  • 5
0

You can try to get data from mongodb in chunk using slice index i.e. get 100000 documents at a time from mongodb. Add documents to dataframe and then fetch next 100000 documents and append the data to dataframe.

client = MongoClient(host,port)
collection = client[db_name][collection_name]
maxrows=905679
        for i in range(0, maxrows, 100000):
            df2 = df2.iloc[0:0]
            if (i+100000<maxrows):
                cursor = collection.find()[i:i+100000]
            else:
                cursor = collection.find()[i:maxrows]
            df2= pd.DataFrame(list(cursor))
            df.append(df2, ignore_index=True)




Refer below link to know more about slice index in mongodb.

https://api.mongodb.com/python/current/api/pymongo/cursor.html

Anupam Chaplot
  • 1,134
  • 1
  • 9
  • 22
0

I have found a solution with multiprocessing and its is the fastest

def chunks(collection_size, n_cores=mp.cpu_count()):
    """ Return chunks of tuples """


    batch_size = round(collection_size/n_cores)
    rest = collection_size%batch_size 
    cumulative = 0
    for i in range(n_cores):
        cumulative += batch_size
        if i == n_cores-1:
            yield (batch_size*i,cumulative+rest)
        else:
           yield (batch_size*i,cumulative)


def parallel_read(skipses,host=HOST, port=PORT):


    print('Starting process on range of {} to {}'.format(skipses[0],skipses[1]))
    client = MongoClient(host,port)
    db = client[DB_NAME]
    collection = db[COLLECTION_NAME]

    cursor = collection.find({},{ '_id': False } )
    _df = pd.DataFrame(list(cursor[skipses[0]:skipses[1]]))
    return _df



def read_mongo(colc_size,_workers=mp.cpu_count()):
    temp_df = pd.DataFrame()
    pool = mp.Pool(processes=_workers)
    results = [pool.apply_async(parallel_read, args=(chunk,))  for chunk in chunks(colc_size,n_cores=_workers)]
    output = [p.get() for p in results]
    temp_df = pd.concat(output)
    return temp_df


time_0 = time()
df = read_mongo(get_collection_size())
print("Reading database with  {} processes took {}".format(mp.cpu_count(),time()-time_0))



Starting process on range of 0 to 53866
Starting process on range of 323196 to 377062
Starting process on range of 430928 to 484794
Starting process on range of 538660 to 592526
Starting process on range of 377062 to 430928
Starting process on range of 700258 to 754124
Starting process on range of 53866 to 107732
Starting process on range of 484794 to 538660
Starting process on range of 592526 to 646392
Starting process on range of 646392 to 700258
Starting process on range of 215464 to 269330
Starting process on range of 754124 to 807990
Starting process on range of 807990 to 915714
Starting process on range of 107732 to 161598
Starting process on range of 161598 to 215464
Starting process on range of 269330 to 323196

Reading database with 16 processes took 142.64860558509827

With one of the examples above (no multiprocessing)

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)

time_0 = time()
cursor = collection.find()
chunk_size = 1000
df = iterator2dataframes(cursor, chunk_size)
print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))

Reading database with chunksize = 10000 took 372.1170778274536

time_0 = time()
cursor = collection.find()
chunk_size = 10000
df = iterator2dataframes(cursor, chunk_size)
print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))

Reading database with chunksize = 10000 took 367.02637577056885

MPA
  • 1,011
  • 7
  • 22