0

Well, I want to create some sort of MapReduce algorithm for creating Inverse index for text documents. In mapping part, I do something like this

letters = ['a']
regx = re.compile("^("+"|".join(letters)+')')
selectedWords = directIndex.aggregate([
    { "$match": { "words.word": regx } },
    { "$unwind": "$words" },
    { "$match": { "words.word": regx } },
    { "$group": { "_id": { "word":"$words.word", "count":"$words.count", 'document' : '$document' } } }])

Well, here, I am selecting all words and information related to them by first letter. After this, i write this information to another collection:

myinvcol.insert_one({'letter':str(''.join(letters)),'words':selectedWords })

In the next step I am reading each inserted document and performing the reduce operation dict('wordName':{documents:[document1:count1, document2:count2, etc], 'wordName2:{documents:[...]}') and make some additional operations on this dict

Now, the fun part)) : It is possible to do the first step(map part) aka aggregation to execute totally on MongoDB server? In other words, i know that there is '$out' operator:

letters = ['a']
regx = re.compile("^("+"|".join(letters)+')')
selectedWords = directIndex.aggregate([
    { "$match": { "words.word": regx } },
    { "$unwind": "$words" },
    { "$match": { "words.word": regx } },
    { "$group": { "_id": { "word":"$words.word", "count":"$words.count", 'document' : '$document' } } }
    { "$out" : 'InverseIndex'}])

It allows me to write result of aggregate to another collection, but it doesn't do what i want: instead of inserting one document :

{'letter':str(''.join(letters)),'words':selectedWords }, 

i got many insertions of

{ "_id": { "word":"$words.word", "count":"$words.count", 'document' : '$document' } }. 

So, to end with, is there a way to create a document in aggregation that merges all its results in one array before the $out statement?

Arron Stowne
  • 111
  • 3

1 Answers1

0

Well, after some research, found out that this could be an solution>

regx = re.compile("^("+"|".join('ab')+')')
myinvcol.insertMany(mydb.runCommand(
{
 'aggregate': "DirectIndex",
    'pipeline': 
    [
    { "$match": { "words.word": regx } },
    { "$unwind": "$words" },
    { "$match": { "words.word": regx } },
    { "$group": { "_id": { "word":"$words.word", "count":"$words.count", 'document' : '$document' } } },
    { "$group": {
        "_id": {'$substr':[''.join('ab'),0,len(''.join('ab'))]},
        "words": {
            "$push": {
                "word": "$_id.word",
                "count":"$_id.count",
                'document' : '$_id.document'
            }
        }
    }},
    {'$out':"InverseIndex"}
]}).result)

(found here mongoDB: how to reverse $unwind) But here, mongo sucks. The out parameter overwrites the content of collection. So if i call this more than one time, the previous result will be gone. As i see here: How do I append Mongo DB aggregation results to an existing collection?, Mongo 4.2 will have special parameter for $out, called- mode: "replaceDocuments". This will allow you to append new content to your collection. But for now, dead idea.

Well, I tried to do this by mongo built-in map_reduce function call:

mape = Code("function () {"
    "var docName =this.document;"
                   "this.words.forEach(function(z) {"
                   "z['document'] = docName;"
                   "var temp = z.word;"
                   "delete z.word;"
    "    emit(temp, {'documents':[z]});"
    "  });"
    "}")
reduce = Code("function (key, values) {"
           "  var total = [];"
           "  for (var i = 0; i < values.length; i++) {"
           "for (var j=0;j<values[i]['documents'].length;j++){"
                "total.push({'document':values[i]['documents'][j]['document'], 'count':values[i]['documents'][j]['count'], 'tf':values[i]['documents'][j]['tf']});"
           "  }}"
           "  return {'documents': total};"
       "}")
finalizeFunction = Code("function (key, reducedVal) {"
        "if('documents' in reducedVal){"
            "var normVal = Math.log((1+"+str(nrDocs)+")/(1+1+reducedVal.documents.length));"
            "reducedVal['idf']=normVal;"
            "return reducedVal;} else{ return null;}"
        "};")
result = mydb.DirectIndex.map_reduce(mape, reduce, {'merge':"InverseIndex"},finalize=finalizeFunction)

This somehow does what i need. The downside is the speed. Compared to MapReduce implemented by hand(aggregation+mapping by dict in which key is word), difference is pretty big. Anyway, if somebody encounters this problem, i know only these 2 ways to solve it.

Arron Stowne
  • 111
  • 3