-1

I am trying to parse a 300MB csv file and save it on mongodb. In order to do that I will need to convert this csv file into a list of BsonDocument which include key value pairs which create a document. each row in the csv file is a new BsonDocument. Every couple of minutes of parallel testing, I am getting OOM exception on the split operation. I've read this article which is very interesting. but I couldn't find any practical solution which I can implement on those huge files.

I was looking into different csv helpers, but couldn't find anything which solve this issue.

Any help will be much appreciated.

Don Bar
  • 211
  • 3
  • 11
  • Can you not stream it line by line and represent it in the MongoDB collection as a series of documents where each document is the equivalent of one line of the source CSV file? – Martin Costello Jan 18 '17 at 20:52
  • Are you running in x64 on a 64bit OS? 300MB should be pretty small... – SledgeHammer Jan 18 '17 at 21:03
  • It sounds like you are reading the file into a single huge string, then splitting it. This is bad because the big string will go on the [large object heap](http://stackoverflow.com/q/8951836/3744182). Stream the CSV in instead. You can use the built-in [`TextFieldParser`](https://msdn.microsoft.com/en-us/library/microsoft.visualbasic.fileio.textfieldparser(v=vs.110).aspx) for this. See e.g. [Reading CSV files using C#](http://stackoverflow.com/a/3508572/3744182). – dbc Jan 18 '17 at 21:14
  • I am running on x64 @SledgeHamme, but I think It run on vshost32.exe so I am not sure if it 64bit. I do get local the all file into string. I will try to read line by line. But I am not sure if I can create something like List> in order to save the header field and the value of each row. – Don Bar Jan 18 '17 at 21:20

2 Answers2

0

You should be able to read it line by line like this:

public static void Main()
{
    using (StreamReader sr = new StreamReader(path))
    {
        string[] headers = null;
        string[] curLine;
        while ((curLine = sr.ReadLine().Split(',')) != null)
        {
            if (firstLine == null)
            {
                headers = curLine;
            }
            else
            {
                processLine(headers, curLine);
            }
        }
    }

}

public static void processLine(string[] headers, string[] line)
{
    for (int i = 0; i < headers.Length)
    {
        string header = headers[i];
        string line = line[i];

        //Now you have individual header/line pairs that you can put into mongodb
    }
}

I've never used mongodb and I don't know the structure of your csv or your mongo, so I won't be able to help much there. Hopefully you can get it from here though. If not, edit your post with some more details about how you need to structure your mongodb and hopefully somebody will post a more helpful answer.

ashbygeek
  • 759
  • 3
  • 20
0

Thank you @dbc That worked! @ashbygeek, I needed to add this to your code,

 while (!sr.EndOfStream && (curLine = sr.ReadLine().Split('\t')) != null)
 {
     //do process
 }

So I am uploading my code which I get my big CSV file from Azure blob, and insert in Batch to mongoDB instead of each document. I also created my own primary key hash, and index, in order to identify duplicates documents, and if I found one, I'll start insert them one by one in order to identify the duplicate.

I hope it will help for someone in the future.

using (TextFieldParser parser = new TextFieldParser(blockBlob2.OpenRead()))
        {
            parser.TextFieldType = FieldType.Delimited;
            parser.SetDelimiters("\t");
            bool headerWritten = false;
            List<BsonDocument> listToInsert = new List<BsonDocument>();
            int chunkSize = 50;
            int counter = 0;
            var headers = new string[0];

            while (!parser.EndOfData)
            {
                //Processing row
                var fields = parser.ReadFields();

                if (!headerWritten)
                {
                    headers = fields;
                    headerWritten = true;
                    continue;
                }

                listToInsert.Add(new BsonDocument(headers.Zip(fields, (k, v) => new { k, v }).ToDictionary(x => x.k, x => x.v)));
                counter++;

                if (counter != chunkSize) continue;
                AdditionalInformation(listToInsert, dataCollectionQueueMessage);
                CalculateHashForPrimaryKey(listToInsert);
                await InsertDataIntoDB(listToInsert, dataCollectionQueueMessage);
                counter = 0;
                listToInsert.Clear();
            }

            if (listToInsert.Count > 0)
            {
                AdditionalInformation(listToInsert, dataCollectionQueueMessage);
                CalculateHashForPrimaryKey(listToInsert);
                await InsertDataIntoDB(listToInsert, dataCollectionQueueMessage);
            }
        }



 private  async Task InsertDataIntoDB(List<BsonDocument>listToInsert, DataCollectionQueueMessage dataCollectionQueueMessage)
    {
        const string connectionString = "mongodb://127.0.0.1/localdb";

        var client = new MongoClient(connectionString);

        _database = client.GetDatabase("localdb");

        var collection = _database.GetCollection<BsonDocument>(dataCollectionQueueMessage.CollectionTypeEnum.ToString());

        await collection.Indexes.CreateOneAsync(new BsonDocument("HashMultipleKey", 1), new CreateIndexOptions() { Unique = true, Sparse = true, });

        try
        {
               await collection.InsertManyAsync(listToInsert);
        }
        catch (Exception ex)
        {
            ApplicationInsights.Instance.TrackException(ex);

            await InsertSingleDocuments(listToInsert, collection, dataCollectionQueueMessage);
        }
    }



private  async Task InsertSingleDocuments(List<BsonDocument> dataCollectionDict, IMongoCollection<BsonDocument> collection
        ,DataCollectionQueueMessage dataCollectionQueueMessage)
    {
        ApplicationInsights.Instance.TrackEvent("About to start insert individual documents and to find the duplicate one");

        foreach (var data in dataCollectionDict)
        {
            try
            {
                 await collection.InsertOneAsync(data);
            }
            catch (Exception ex)
            {
                ApplicationInsights.Instance.TrackException(ex,new Dictionary<string, string>() {
                    {
                        "Error Message","Duplicate document was detected, therefore ignoring this document and continuing to insert the next docuemnt"
                    }, {
                        "FilePath",dataCollectionQueueMessage.FilePath
                    }}
                );
            }
        }
    }
Don Bar
  • 211
  • 3
  • 11